What’s the recommended approach to building a dag ...
# ask-community
s
What’s the recommended approach to building a dag that would continue if there was a failure, but would still allow me to know there was a failure? For example, suppose I have a job with a graph composed of ops A->B->C. If B fails, I still want C to run, but I would like the job to show as failed in dagit (and possibly have some notification sent out via email or slack)
🤖 1
o
hm can you elaborate a bit more on what sort of steps B and C are, and if there's any data passed between these two? One possibility would be to break this into two separate jobs, A->B and C, with a couple of run status sensors to launch a run of C whenever the A->B job succeeds or fails. Another option would be to have a dag like:
Copy code
A -> B -> C
      \-> D
where B has two outputs. The output connected to D would be optional (i.e. have
is_required=False
). If the B step caught an error, it could emit an output to D, which could do your alerting, and fail afterwards so that the run would show the failure.
s
There is no data passed between B and C. I think I like the run status sensor for this, I’ll look into it more, thanks!
🎉 1
Is there a way to pass an ‘OR’ conditions to the run status sensor, or do I need to create two: 1 for SUCCESS, 1 for FAILURE?
o
yeah I was just looking into this -- unfortunately it doesn't look like there's a way to combine them both together at the moment (other than writing your own custom sensor that queries the events database, which I wouldn't recommend). Just created an issue for that here: https://github.com/dagster-io/dagster/issues/9181
d
We use a sensor factory for this exact reason ☝️
s
Getting some very unhelpful errors here. I’ve got what I think is a pretty simple sensor:
Copy code
@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    monitored_jobs=[el_meltano.elt_all_job],
    request_job=dbt.dbt_run_job,
    default_status=DefaultSensorStatus.RUNNING,
)
def el_meltano_fin_success(context):
    return RunRequest(run_key=None)
When the dagster-daemon runs, I’m seeing this error, which is truncated in the log:
Copy code
success : dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor el_meltano_fin_success

Stack Trace:
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
dagster._check.CheckError: Member of list mismatches type. Expected (<class 'dagster.core.definitions.run_request.SkipReason'>, <class 'dagster.core.definitions.run_request.RunRequest'>, <class 'dagster.core.definitions.run_request.PipelineRunReaction'>). Got None of type <class 'NoneType'>.

Stack Trace:
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
    yield
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/core/definitions/sensor_definition.py", line 372, in evaluate_tick
    check.is_list(result, (SkipReason, RunRequest, PipelineRunReaction))
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/_check/__init__.py", line 875, in is_list
    return _check_iterable_items(obj, of_type, "list")
  File "/Users/sterling.paramore/miniconda3/envs/mse-dagster/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1612, in _check_iterable_items
    raise CheckError(
Hmmm… on a whim I changed
return RunRequest(run_key=None)
to
yield RunRequest(run_key=None)
and it worked. Maybe the docs are wrong?
o
hm that's pretty bewildering to me as well, that looks totally correct to me... I've used
return RunRequest(run_key=None)
myself before, without issue. I'm wondering if you change it back to
return
, if the issue comes back? I'm wondering if this could potentially be happening because the daemon was running a non-updated version of the code 🤔
s
I stopped/started the daemon many times during debugging this.
o
got it -- I can see if I can replicate the issue, glad using
yield
solves it for now
s
In the
run_status_sensor
, is there any way to get the full config of the job that triggered the sensor? I can get the job name and id from
context.dagster_run.run_id
and
context.dagster_run.job_name
. And I can get the “run_config”
context.dagster_run.run_config
. However, if the config wasn’t modified in dagit, the run_config is empty. How can I get the config that was executed?
So now I have two jobs: A and B. I’ve set up schedule A to run job A, and then a sensor that runs job B after job A runs. However, if a user manually launched job A from dagit, then I don’t want the sensor to run job B. I was thinking I could have a config (
run_downstream
) that defaults to
False
, but the scheduled job would set it to true. Then the sensor would check the value of
run_downstream
and determine if job B should run or not. The only problem I’m running into with this approach is that I don’t know how to have the sensor figure out what the config was in a job if it wasn’t explicitly set as a run_config.
o
Ah interesting -- the run config option is a totally valid way of doing it, but another option would be to inspect the
dagster_run.tags
. When a run is launched from a schedule, a tag with the key
"dagster/schedule_name"
will be set. So you can determine if the run was launched from the schedule by checking for the existence of that tag. In general, I think tagging the runs will be a more straightforward approach, even if you do want to allow Dagit users to sometimes also launch the downstream job (you can also edit tags for the run in dagit, so you could have users set a special "run_downstream" tag for those cases).
Totally agree with the underlying point though, which is that the contents of the
run_config
field are kinda surprising (it's really just the user-supplied run config, rather than the fully resolved run config after config mapping and defaults are applied). I believe the current plan for the future is to also expose the fully resolved config, but I don't have a timeline on that.
s
Got it working with tags. Thanks Owen!
🎉 1
o
no problem 🙂