Joram Sbrisny
03/04/2024, 3:01 PMScout
03/04/2024, 3:01 PM@asset
decorator to create software-defined assets and specify their dependencies. An upstream asset is essentially an asset that other assets depend on. Here's a general approach to defining an upstream asset: 1. Use the @asset
decorator to define your asset function. 2. If your asset depends on other assets, you can specify these dependencies using the ins
parameter of the @asset
decorator. Here's an example of how you might define an upstream asset: python from dagster import asset @asset def upstream_asset(): # Your logic to produce the asset goes here return some_data
And then you can define a downstream asset that depends on this upstream asset like so: python from dagster import asset @asset(ins={"upstream_input": AssetIn(key="upstream_asset")}) def downstream_asset(upstream_input): # Your logic that uses the upstream asset's data processed_data = some_transformation(upstream_input) return processed_data
In this example, downstream_asset
is defined with an input that specifies a dependency on upstream_asset
. When you materialize downstream_asset
, Dagster will ensure that upstream_asset
is materialized first. Remember that the actual implementation of your assets will depend on your specific use case and the data processing logic you need to apply. The above code is a simplified example to illustrate the concept of defining upstream and downstream dependencies between assets in Dagster.Joram Sbrisny
03/04/2024, 3:03 PMScout
03/04/2024, 3:03 PM@asset
decorator and specifying the dependencies. In Dagster, when you define an asset, you can declare its upstream dependencies by specifying the inputs to the asset. Each input corresponds to an upstream asset that must be materialized before the current asset. Here's an example of how you might define an asset with an upstream dependency: python from dagster import asset, AssetIn # Define the upstream asset @asset def upstream_asset(): # Logic to produce the upstream asset return some_upstream_data # Define the downstream asset, which has an upstream dependency @asset(ins={"upstream_input": AssetIn(key="upstream_asset")}) def downstream_asset(upstream_input): # Logic that uses the upstream asset's data processed_data = some_transformation(upstream_input) return processed_data
In this code snippet, downstream_asset
has a declared input named upstream_input
, which is an AssetIn
object with the key set to "upstream_asset"
. This key corresponds to the upstream_asset
that we defined above. When you materialize downstream_asset
, Dagster will automatically recognize that upstream_asset
needs to be materialized first due to this declared dependency. This way of defining assets makes it clear what the dependencies are and allows Dagster to manage the execution order of assets to respect these dependencies.Joram Sbrisny
03/04/2024, 3:04 PMScout
03/04/2024, 3:04 PMpython from dagster import asset, AssetIn # Define the upstream asset @asset def upstream_asset(): # Logic to produce the upstream asset return some_upstream_data # Define the downstream asset, which depends on the upstream asset @asset(ins={"upstream_input": AssetIn(key="upstream_asset")}) def downstream_asset(upstream_input): # Logic that uses the upstream asset's data processed_data = some_transformation(upstream_input) return processed_data
In this example, upstream_asset
doesn't know about downstream_asset
. It's the downstream_asset
that declares it has an upstream dependency by specifying upstream_input
as an input with AssetIn(key="upstream_asset")
. This tells Dagster that downstream_asset
depends on upstream_asset
. If you want to define a new asset and declare that upstream_asset
is its upstream, you would do it in a similar way: python @asset(ins={"another_upstream_input": AssetIn(key="upstream_asset")}) def another_downstream_asset(another_upstream_input): # Logic that uses the upstream asset's data another_processed_data = another_transformation(another_upstream_input) return another_processed_data
In this new asset, another_downstream_asset
, we again declare that upstream_asset
is an upstream dependency by using AssetIn(key="upstream_asset")
in the inputs. To summarize, in Dagster, you define upstream relationships by specifying inputs to downstream assets. The upstream assets themselves do not declare their downstream dependents.Joram Sbrisny
03/04/2024, 3:05 PMScout
03/04/2024, 3:05 PMAssetIn
object to reference the keys of the upstream assets it depends on. The design philosophy in Dagster is that upstream assets should not need to know about their consumers. This keeps the definition of each asset clean and focused on its own logic, without needing to be aware of the broader dependency graph. The downstream assets are responsible for declaring what they depend on, which allows Dagster to construct the asset dependency graph and ensure that assets are materialized in the correct order. Here's a quick recap of how you define an asset with an upstream dependency: python from dagster import asset, AssetIn # Define the upstream asset @asset def upstream_asset(): # Logic to produce the upstream asset return some_upstream_data # Define the downstream asset, which depends on the upstream asset @asset(ins={"upstream_input": AssetIn(key="upstream_asset")}) def downstream_asset(upstream_input): # Logic that uses the upstream asset's data processed_data = some_transformation(upstream_input) return processed_data
In this example, downstream_asset
declares that it depends on upstream_asset
by specifying it in the ins
parameter. The upstream_asset
does not and should not have any information about which assets depend on it.Joram Sbrisny
03/04/2024, 3:07 PMScout
03/04/2024, 3:07 PMpython from dagster import asset, AssetIn, AssetGroup, repository # Define the upstream asset @asset def upstream_asset(): # Logic to produce the upstream asset return some_upstream_data # Define the downstream asset, which depends on the upstream asset @asset(ins={"upstream_input": AssetIn(key="upstream_asset")}) def downstream_asset(upstream_input): # Logic that uses the upstream asset's data processed_data = some_transformation(upstream_input) return processed_data # Group the assets together my_assets = AssetGroup(assets=[upstream_asset, downstream_asset]) # Build a job that materializes the assets my_job = my_assets.build_job(name="my_job") # Define the repository @repository def my_repo(): return [my_job]
In this example, my_job
is a job that materializes both upstream_asset
and downstream_asset
. When you execute my_job
, Dagster will first materialize upstream_asset
because downstream_asset
has declared it as an upstream dependency. The job will ensure that the assets are materialized in the correct order based on their dependencies. The dependencies between assets are defined at the asset level, not at the job level. The job simply includes the assets, and Dagster uses the dependencies that have been defined between the assets to determine the execution order when the job is run.Joram Sbrisny
03/04/2024, 3:09 PMScout
03/04/2024, 3:09 PMpython from dagster import op, job, Out, In # Define the op that acts as the upstream dependency @op(out=Out()) def upstream_op(): # Logic to produce the data return some_upstream_data # Define the downstream op, which depends on the upstream op @op(ins={"upstream_data": In()}) def downstream_op(upstream_data): # Logic that uses the upstream op's data processed_data = some_transformation(upstream_data) return processed_data # Build a job that orchestrates the ops @job def my_job(): downstream_op(upstream_op())
In this example, upstream_op
is an op that produces some data, and downstream_op
is an op that takes that data as input and processes it. The my_job
job defines the execution graph where downstream_op
depends on the output of upstream_op
. When you execute my_job
, Dagster will first execute upstream_op
and then pass its output to downstream_op
. The job effectively defines the execution order of the ops based on their dependencies. While ops are a valid way to define dependencies and orchestrate execution, we encourage the use of assets for data-oriented workflows because assets provide a higher-level abstraction that is more suited for modeling data dependencies and lineage. Assets also integrate with Dagster's asset catalog, giving you visibility into the history and relationships of your data. However, the choice between using assets or ops depends on the specific needs and design of your system.