https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

Slackbot

02/18/2024, 3:14 PM
This message was deleted.
s

Scout

02/18/2024, 3:14 PM
Based on the documentation, there isn't a direct mention of creating a job that contains multiple sub-jobs with inter-job dependencies. However, the concept of jobs in Dagster is that they are the main unit of execution and monitoring, typically constructed using the
@job
decorator. Jobs can be created from ops or from graphs, and within a job, you can define dependencies between ops to create a workflow. If you want to create dependencies between separate jobs, the documentation suggests using sensors, specifically run_status_sensors. For example, if you have two jobs,
job_1
and
job_2
, and you want to run
job_2
whenever
job_1
has run and succeeded, you can write a run_status_sensor to accomplish this. Here is an example from the documentation:
Copy code
python @run_status_sensor(     run_status=DagsterRunStatus.SUCCESS,     monitored_jobs=[job_1]     request_job=job_2, ) def run_job_2_sensor(context):     return RunRequest(run_key=None, run_config=run_config)
This sensor will trigger
job_2
to run every time
job_1
succeeds. Note that this kind of dependency is only an ordering dependency; it does not allow for passing data from
job_1
to
job_2
. For more complex scenarios where you might want to run related jobs or have a job control the execution of other jobs, the documentation does not provide a clear example or pattern. In such cases, you might need to use custom logic within sensors or explore other features of Dagster that could help orchestrate such workflows.
Yes, Dagster can trigger asset materialization from upstream assets that are not part of the main job. This is possible through the use of external assets and the
AssetMaterialization
event. As described in the documentation, you can define external assets and their dependencies, and then use these external assets within your job definitions. Here's an example from the documentation that illustrates how an external asset can be defined as a dependency for a fully-managed asset:
Copy code
python from dagster import AssetSpec, Definitions, asset, external_assets_from_specs  raw_logs = AssetSpec("raw_logs") processed_logs = AssetSpec("processed_logs", deps=[raw_logs])  @asset(deps=[processed_logs]) def aggregated_logs() -> None:     # Loads "processed_log" into memory and performs some aggregation     ...  defs = Definitions(     assets=[aggregated_logs, *external_assets_from_specs([raw_logs, processed_logs])] )
In this example,
aggregated_logs
is a fully-managed asset that depends on
processed_logs
, which is an external asset. When
processed_logs
is materialized, Dagster can be configured to automatically trigger the materialization of
aggregated_logs
. Additionally, Dagster provides features like
AutoMaterializePolicy
that can be used to specify conditions under which assets should be automatically materialized when new data becomes available upstream. To keep external assets updated, you can use various approaches such as REST API, sensors, Python API, or logging events using ops. For example, you can use a sensor to monitor for materializations of external assets and trigger materializations of downstream assets accordingly. It's important to note that while Dagster can manage these dependencies and trigger materializations based on them, the actual scheduling and execution of materializations for external assets are not controlled by Dagster. You need to ensure that the external assets are kept up to date using one of the methods mentioned above.
Yes, Dagster can trigger asset materialization on upstream assets that are not part of the main job. This is possible through the use of the
AssetSelection
API, which allows you to specify which assets should be materialized when executing a job. You can include upstream assets in the selection, even if they are not directly part of the job's definitions. Here's an example from the documentation that demonstrates how to use
AssetSelection
to include upstream assets:
Copy code
python from dagster import AssetSelection, define_asset_job  # Define a job that materializes a specific asset and its upstream dependencies materialize_with_upstream = define_asset_job(     "materialize_with_upstream",     selection=AssetSelection.keys(AssetKey(["my_asset"])).upstream(), )
In this example,
materialize_with_upstream
is a job that, when executed, will materialize the asset with the key
["my_asset"]
and all of its upstream dependencies, even if those upstream assets are not explicitly defined within the same job. This feature is particularly useful when you have complex asset dependencies across different jobs and want to ensure that all necessary upstream data is up-to-date before materializing a downstream asset.