Terry Lines
02/22/2024, 7:19 PMScout
02/22/2024, 7:19 PMCircularDependencyError
. This suggests that Dagster does not currently support circular dependencies between assets, even if they are partitioned in a way that could theoretically allow for it.
For reference, the issue discussing this feature request is titled "Allow cycles in the asset graph as long as asset partitions only depend on earlier partitions" and can be found in the Dagster GitHub repository. Allow cycles in the asset graph as long ... button Updating materialized assets button dbt and cyclical dependencies in Dagster button Is it possible for assets to load inputs... button Thank you for your feedback!Terry Lines
02/22/2024, 7:22 PMScout
02/22/2024, 7:22 PMpython from dagster import asset, MetadataValue @asset def my_asset(context): # Retrieve the last materialization timestamp from metadata last_materialization_timestamp = context.get_metadata_value('last_materialization_timestamp') # Logic to check for new data since last_materialization_timestamp new_data = check_for_new_data_since(last_materialization_timestamp) if new_data: # Logic to update the asset with new data update_asset_with_new_data(new_data) # Record the current timestamp as metadata yield <http://MetadataValue.int|MetadataValue.int>(time.time(), label='last_materialization_timestamp') else: # No new data found, so skip the update <http://context.log.info|context.log.info>("No new data found. Skipping update.")
In this example, check_for_new_data_since
is a placeholder for your logic to check the source data, and update_asset_with_new_data
is a placeholder for your logic to update the asset with the new data. Please note that the actual implementation will depend on the specifics of your data sources, how you're tracking changes, and how you're storing and retrieving metadata. The example above is a high-level illustration and not a complete solution.Terry Lines
02/22/2024, 7:39 PMScout
02/22/2024, 7:39 PMTerry Lines
02/22/2024, 7:44 PMScout
02/22/2024, 7:44 PMpython from dagster import asset, AssetKey, AssetMaterialization, EventMetadataEntry @asset def my_asset(context): # Attempt to load the current value of the asset try: current_value = context.instance.get_latest_materialization(AssetKey("my_asset")) <http://context.log.info|context.log.info>(f"Loaded current value: {current_value}") except Exception as e: # Handle the case where this is the first execution or the asset is not found current_value = None <http://context.log.info|context.log.info>("No current value found, starting from scratch") # ... Perform computation to update the asset using current_value if available ... # Yield an AssetMaterialization to record the new state of the asset yield AssetMaterialization( asset_key="my_asset", metadata_entries=[EventMetadataEntry.text(str(new_value), "New Value")] ) # Return the new value for the asset return new_value
In this example, get_latest_materialization
is used to retrieve the most recent materialization of the asset. If it's the first time the asset is being materialized, or if the previous materialization cannot be found, you handle this case accordingly. Please note that the actual implementation details will depend on your specific use case and how you manage state and dependencies in your Dagster assets.Terry Lines
02/22/2024, 7:56 PMScout
02/22/2024, 7:56 PMgraph_multi_asset
is a way to define multiple assets that are computed using the same graph of ops and the same upstream assets. Here's an example from the documentation that demonstrates how to define a `graph_multi_asset`: python from dagster import AssetOut, graph_multi_asset @graph_multi_asset( outs={ "first_asset": AssetOut(), "second_asset": AssetOut(), } ) def two_assets(upstream_asset): one, two = two_outputs(upstream_asset) return {"first_asset": one, "second_asset": two}
In this example, two_assets
is a graph_multi_asset
that takes an upstream_asset
as input and produces two assets, first_asset
and second_asset
. The function two_outputs
is assumed to be a function that takes the upstream_asset
and returns two values, which are then mapped to the respective assets. Please note that this is a high-level example, and the actual implementation of two_outputs
and how upstream_asset
is defined and used would depend on your specific use case. The graph_multi_asset
decorator allows you to create a combined definition of multiple assets that are computed using the same graph logic.Terry Lines
02/22/2024, 7:58 PMScout
02/22/2024, 7:58 PMgraph_multi_asset
that updates assets by using their prior values. However, I can describe a conceptual approach based on the capabilities of Dagster's asset system.
To create a graph_multi_asset
that updates assets using their prior values, you would need to:
1. Define a way to retrieve the previous materialization of each asset.
2. Use the retrieved values as inputs to your computation.
3. Output the new values as the updated assets.
Here's a conceptual example based on the capabilities described in the documentation:
python
from dagster import AssetOut, graph_multi_asset, asset, OpExecutionContext
# Assume these are your asset definitions
@asset
def asset_one(context: OpExecutionContext):
# Logic to retrieve and update the asset's value
...
@asset
def asset_two(context: OpExecutionContext):
# Logic to retrieve and update the asset's value
...
# Define a graph that updates both assets
@graph_multi_asset(
outs={
"updated_asset_one": AssetOut(),
"updated_asset_two": AssetOut(),
}
)
def update_assets(context: OpExecutionContext):
# Retrieve the previous materialization of each asset
prev_asset_one = context.instance.get_latest_materialization(AssetKey("asset_one"))
prev_asset_two = context.instance.get_latest_materialization(AssetKey("asset_two"))
# Logic to update the assets using their previous values
new_value_for_asset_one = update_logic_for_asset_one(prev_asset_one)
new_value_for_asset_two = update_logic_for_asset_two(prev_asset_two)
# Return the new values as a dictionary mapping to the asset keys
return {
"updated_asset_one": new_value_for_asset_one,
"updated_asset_two": new_value_for_asset_two,
}
# Utility functions to perform the update logic
def update_logic_for_asset_one(prev_value):
# Implement your logic here
...
def update_logic_for_asset_two(prev_value):
# Implement your logic here
...
In this conceptual example, update_assets
is a graph_multi_asset
that updates two assets, updated_asset_one
and updated_asset_two
, by using their previous materializations. The update_logic_for_asset_one
and update_logic_for_asset_two
functions contain the logic for updating the assets based on their previous values.
Please note that this is a high-level conceptual example and not a complete solution. The actual implementation would depend on how you store and retrieve asset materializations and the specific logic for updating the assets. Additionally, the get_latest_materialization
method and how you handle the absence of a previous materialization (in case it's the first run) would need to be implemented according to your specific requirements and the caScout
02/22/2024, 7:59 PM