Sterling Paramore
08/02/2022, 9:58 PMowen
08/02/2022, 10:14 PMA -> B -> C
\-> D
where B has two outputs. The output connected to D would be optional (i.e. have is_required=False
). If the B step caught an error, it could emit an output to D, which could do your alerting, and fail afterwards so that the run would show the failure.Sterling Paramore
08/02/2022, 10:30 PMowen
08/02/2022, 10:43 PMdwall
08/02/2022, 10:44 PMSterling Paramore
08/02/2022, 11:42 PM@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
monitored_jobs=[el_meltano.elt_all_job],
request_job=dbt.dbt_run_job,
default_status=DefaultSensorStatus.RUNNING,
)
def el_meltano_fin_success(context):
return RunRequest(run_key=None)
When the dagster-daemon runs, I’m seeing this error, which is truncated in the log:
success : dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor el_meltano_fin_success
Stack Trace:
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Member of list mismatches type. Expected (<class 'dagster.core.definitions.run_request.SkipReason'>, <class 'dagster.core.definitions.run_request.RunRequest'>, <class 'dagster.core.definitions.run_request.PipelineRunReaction'>). Got None of type <class 'NoneType'>.
Stack Trace:
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
yield
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/core/definitions/sensor_definition.py", line 372, in evaluate_tick
check.is_list(result, (SkipReason, RunRequest, PipelineRunReaction))
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/_check/__init__.py", line 875, in is_list
return _check_iterable_items(obj, of_type, "list")
File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1612, in _check_iterable_items
raise CheckError(
return RunRequest(run_key=None)
to yield RunRequest(run_key=None)
and it worked. Maybe the docs are wrong?owen
08/02/2022, 11:56 PMreturn RunRequest(run_key=None)
myself before, without issue. I'm wondering if you change it back to return
, if the issue comes back? I'm wondering if this could potentially be happening because the daemon was running a non-updated version of the code 🤔Sterling Paramore
08/03/2022, 12:07 AMowen
08/03/2022, 12:08 AMyield
solves it for nowSterling Paramore
08/03/2022, 12:46 AMrun_status_sensor
, is there any way to get the full config of the job that triggered the sensor?
I can get the job name and id from context.dagster_run.run_id
and context.dagster_run.job_name
. And I can get the “run_config” context.dagster_run.run_config
. However, if the config wasn’t modified in dagit, the run_config is empty. How can I get the config that was executed?run_downstream
) that defaults to False
, but the scheduled job would set it to true. Then the sensor would check the value of run_downstream
and determine if job B should run or not. The only problem I’m running into with this approach is that I don’t know how to have the sensor figure out what the config was in a job if it wasn’t explicitly set as a run_config.owen
08/03/2022, 4:47 PMdagster_run.tags
. When a run is launched from a schedule, a tag with the key "dagster/schedule_name"
will be set. So you can determine if the run was launched from the schedule by checking for the existence of that tag.
In general, I think tagging the runs will be a more straightforward approach, even if you do want to allow Dagit users to sometimes also launch the downstream job (you can also edit tags for the run in dagit, so you could have users set a special "run_downstream" tag for those cases).run_config
field are kinda surprising (it's really just the user-supplied run config, rather than the fully resolved run config after config mapping and defaults are applied). I believe the current plan for the future is to also expose the fully resolved config, but I don't have a timeline on that.Sterling Paramore
08/03/2022, 6:05 PMowen
08/03/2022, 6:09 PM