Slackbot
02/18/2024, 3:14 PMScout
02/18/2024, 3:14 PM@job
decorator. Jobs can be created from ops or from graphs, and within a job, you can define dependencies between ops to create a workflow. If you want to create dependencies between separate jobs, the documentation suggests using sensors, specifically run_status_sensors. For example, if you have two jobs, job_1
and job_2
, and you want to run job_2
whenever job_1
has run and succeeded, you can write a run_status_sensor to accomplish this. Here is an example from the documentation: python @run_status_sensor( run_status=DagsterRunStatus.SUCCESS, monitored_jobs=[job_1] request_job=job_2, ) def run_job_2_sensor(context): return RunRequest(run_key=None, run_config=run_config)
This sensor will trigger job_2
to run every time job_1
succeeds. Note that this kind of dependency is only an ordering dependency; it does not allow for passing data from job_1
to job_2
. For more complex scenarios where you might want to run related jobs or have a job control the execution of other jobs, the documentation does not provide a clear example or pattern. In such cases, you might need to use custom logic within sensors or explore other features of Dagster that could help orchestrate such workflows.Scout
02/18/2024, 3:19 PMAssetMaterialization
event. As described in the documentation, you can define external assets and their dependencies, and then use these external assets within your job definitions. Here's an example from the documentation that illustrates how an external asset can be defined as a dependency for a fully-managed asset: python from dagster import AssetSpec, Definitions, asset, external_assets_from_specs raw_logs = AssetSpec("raw_logs") processed_logs = AssetSpec("processed_logs", deps=[raw_logs]) @asset(deps=[processed_logs]) def aggregated_logs() -> None: # Loads "processed_log" into memory and performs some aggregation ... defs = Definitions( assets=[aggregated_logs, *external_assets_from_specs([raw_logs, processed_logs])] )
In this example, aggregated_logs
is a fully-managed asset that depends on processed_logs
, which is an external asset. When processed_logs
is materialized, Dagster can be configured to automatically trigger the materialization of aggregated_logs
. Additionally, Dagster provides features like AutoMaterializePolicy
that can be used to specify conditions under which assets should be automatically materialized when new data becomes available upstream. To keep external assets updated, you can use various approaches such as REST API, sensors, Python API, or logging events using ops. For example, you can use a sensor to monitor for materializations of external assets and trigger materializations of downstream assets accordingly. It's important to note that while Dagster can manage these dependencies and trigger materializations based on them, the actual scheduling and execution of materializations for external assets are not controlled by Dagster. You need to ensure that the external assets are kept up to date using one of the methods mentioned above.Scout
02/18/2024, 3:19 PMAssetSelection
API, which allows you to specify which assets should be materialized when executing a job. You can include upstream assets in the selection, even if they are not directly part of the job's definitions. Here's an example from the documentation that demonstrates how to use AssetSelection
to include upstream assets: python from dagster import AssetSelection, define_asset_job # Define a job that materializes a specific asset and its upstream dependencies materialize_with_upstream = define_asset_job( "materialize_with_upstream", selection=AssetSelection.keys(AssetKey(["my_asset"])).upstream(), )
In this example, materialize_with_upstream
is a job that, when executed, will materialize the asset with the key ["my_asset"]
and all of its upstream dependencies, even if those upstream assets are not explicitly defined within the same job. This feature is particularly useful when you have complex asset dependencies across different jobs and want to ensure that all necessary upstream data is up-to-date before materializing a downstream asset.