https://dagster.io/ logo
#ask-community
Title
# ask-community
g

Gil Rutter

08/07/2023, 4:25 PM
Hello, I am trying to convert a bunch of Python tasks in a homespun framework into Dagster. Initially I am using
@asset
to wrap the
None
-returning tasks and using
non_argument_deps=
to specify dependencies. To match what we had before, I have the following requirements regarding execution during runs of a job (feel free to try and talk me out of having these requirements) • It should be possible to tag an asset such that, if any of its dependencies fail, it executes anyway • It should be possible to tag an asset with a time of day (format of this tbd) after which the asset will start to run even if its dependencies have not completed I believe this would require changing
ActiveExecution._update
(here) and somehow getting my altered version where it's needed. Could I get some initial feedback on what I'm looking to do please?
🤖 1
s

Sean Lopp

08/07/2023, 7:36 PM
Hey Gil, I wonder if in your use case, it might make sense to just have a scheduled asset job run for C? eg if you have assets A, B, C where C is downstream of A and B you could setup a job to run C no matter what on some regular cron basis
g

Gil Rutter

08/07/2023, 7:55 PM
I don't think that works well, generally C should run after A and B are complete and I don't want to have to maintain estimates of runtime for all dependencies!
It would also be really nice to be able to view the morning's run as a single unit
s

Sean Lopp

08/07/2023, 8:09 PM
Right, I guess a slight variation of this would be, you can setup a run for A, B, C (that runs C after A and B - that would the default behavior) But then you could setup a second "sweeper" run of only C. You could even use a custom ScheduleDefinition to have the C-only run occur only if there was not a successful run of A,B,C to avoid un-necessary duplicate runs The way this latter part would work is
Copy code
@schedule(job=c_only_asset_job, cron_schedule="0 11 * * *")
def sweeper_job_schedule(context: ScheduleEvaluationContext):
    # code to check for A, B, C
    ...context.instance.get_runs(filters=...)

    if not ABC_run:
      return RunRequest(...)
I just would personally hesitate to modify the execution plan, that's pretty far into the internals
g

Gil Rutter

08/07/2023, 8:22 PM
Thanks for this, I'll have a play tomorrow.
I'm wondering about race conditions. These sweep jobs would have to request a run of all their downstream dependencies, too. If two tasks have their sweep scheduled for the same time, and one is a downstream dependency of the other, or they share common downstream dependencies, is it possible to ensure each task runs only once?
Perhaps the original job and all sweeper jobs can use
stale_assets_only=True
and I could define staleness appropriately. Would this protect against all potential races?
s

Sean Lopp

08/07/2023, 8:57 PM
I think your challenge is that you want the sweeper jobs to run assets that are not stale, right? eg if A, B aren't updated, you still want C to run, even though in that case C is not stale
g

Gil Rutter

08/07/2023, 9:04 PM
Good point, they may in fact not be stale at all
s

Sean Lopp

08/07/2023, 9:05 PM
Yea I think normally, if C actually depends on A, B, the freshness policies and regular scheduling will get you really far. But if C is only sort of dependent on them, the situation is more complex
g

Gil Rutter

08/07/2023, 9:13 PM
Well thanks a lot for the responses.
s

Sean Lopp

08/07/2023, 9:33 PM
Yea, as you build out the scenario let us know if theres any feedback or specifics we can get into further.