Jerry Bao
03/05/2024, 2:02 AMScout
03/05/2024, 2:02 AMAutoMaterializePolicy
with specific rules that fit your requirements. In your case, you want to ensure that the asset is materialized only when its dependencies with the same partition key have been materialized, without waiting for other dependencies that don't share the same partition key. You can achieve this by applying the skip_on_not_all_parents_updated
rule to your asset. This rule allows you to specify that an asset should only be materialized if all or at least one of the required parent partitions have been updated since the last materialization of the asset. The rule can be configured to require updates for all parent partitions or just one, based on the require_update_for_all_parent_partitions
parameter. Here's an example of how you might configure this rule for your asset, as described in the documentation: python from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset @asset(auto_materialize_policy=AutoMaterializePolicy.eager().with_rules( AutoMaterializeRule.skip_on_not_all_parents_updated( require_update_for_all_parent_partitions=True ) )) def my_asset(): # Your asset code here ...
In this example, the require_update_for_all_parent_partitions=True
parameter ensures that the asset will only be auto-materialized if all upstream partitions that share the same partition key have been materialized since the last materialization of this asset. Please note that the AutoMaterializePolicy
and its rules are part of the experimental API, which means that some of the APIs may change in the future. It's important to consult the latest Dagster documentation for any updates or changes to these APIs. If you have assets with dynamic partition definitions, you should ensure that the rules you apply are compatible with dynamic partitioning. The documentation does not provide a specific example for dynamic partitions in this context, so if you encounter any issues or if the behavior does not match your expectations, you may need to explore further or reach out to the Dagster community for more tailored advice.Jerry Bao
03/05/2024, 2:08 AMrequire_update_for_all_parent_partitions
parameter ensures that all parent partitions are materialized before the child partition is materialized. it doesn't take into account partitions that share the same partition key.Scout
03/05/2024, 2:08 AMrequire_update_for_all_parent_partitions
parameter is used to control whether all upstream partitions need to be updated before the downstream asset is materialized. It does not filter based on whether the partitions share the same partition key. To auto-materialize an asset once all the dependent asset partitions with the same partition key have been materialized, you can use the materialize_on_parent_updated
rule with a custom filter. This rule allows you to specify that an asset should be materialized if one of its parents has been updated more recently than it has. You can provide a custom filter to determine which updated parents should trigger the materialization. Here's an example of how you might use this rule: python from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset, AutoMaterializeAssetPartitionsFilter # Define a custom filter that checks if the parent partition key matches the current asset's partition key def my_partition_filter(parent_partition_key, current_partition_key): return parent_partition_key == current_partition_key @asset(auto_materialize_policy=AutoMaterializePolicy.eager().with_rules( AutoMaterializeRule.materialize_on_parent_updated( updated_parent_filter=AutoMaterializeAssetPartitionsFilter(my_partition_filter) ) )) def my_asset(): # Your asset code here ...
In this example, my_partition_filter
is a custom function that you would define to check if the parent partition key matches the current asset's partition key. The AutoMaterializeAssetPartitionsFilter
is used to apply this custom filter logic to the materialize_on_parent_updated
rule. Please note that the actual implementation of the custom filter function and its integration with the AutoMaterializeRule
may vary based on the specifics of your use case and the current capabilities of the Dagster API. The provided example is a conceptual illustration, and you may need to adapt it to fit the actual API and your requirements. If the built-in rules and filters do not provide the functionality you need, you may have to implement custom logic within your assets or use sensors to trigger materialization based on the specific conditions you have described. Always refer to the latest Dagster documentation for the most up-to-date information on how to implement these features.Jerry Bao
03/05/2024, 2:09 AMAutoMaterializeAssetPartitionsFilter
does not have a custom filter function as an input, only latest_run_required_tags
Scout
03/05/2024, 2:09 AMAutoMaterializeAssetPartitionsFilter
does not accept a custom filter function as an input. It only accepts latest_run_required_tags
, which allows you to specify tags that must be present on the latest run of the parent asset for it to be considered updated.
Given this limitation, if you need to materialize an asset only when all dependent asset partitions with the same partition key have been materialized, and you want to ignore dependencies that do not share the same partition key, you might need to implement custom logic outside of the auto-materialization rules provided by Dagster.
One approach could be to use sensors to monitor the materialization of dependent assets and trigger the materialization of your asset when the conditions are met. Sensors can be programmed with custom logic to check for the existence of materializations with specific partition keys.
Here's a conceptual example of how you might set up a sensor to trigger materialization based on partition keys:
python
from dagster import sensor, RunRequest, repository
from dagster.core.definitions.sensor_definition import RunKey
@sensor(job=my_asset_job)
def my_asset_sensor(context):
# Logic to determine if all dependent assets with the same partition key are materialized
# This could involve querying the asset catalog or materialization events
partition_key = "2023-01-01" # Example partition key
all_deps_materialized = check_if_all_deps_materialized(partition_key)
if all_deps_materialized:
run_key = RunKey(partition_key)
run_config = {"solids": {"my_asset": {"config": {"partition_key": partition_key}}}}
yield RunRequest(run_key=run_key, run_config=run_config)
def check_if_all_deps_materialized(partition_key):
# Custom logic to check if all dependencies are materialized for the given partition key
# This might involve querying your metadata database or other persistent storage
# where materialization information is kept
return True # Placeholder for actual implementation
@repository
def my_repo():
return [my_asset_job, my_asset_sensor]
In this example, my_asset_sensor
is a sensor that checks whether all dependencies with a specific partition key are materialized. If they are, it yields a RunRequest
to materialize the my_asset
job with the corresponding partition key. The check_if_all_deps_materialized
function would contain the custom logic to determine if the dependencies are materialized, which is not shown here.
Please note that this is a simplified example, and the actual implementation would depend on your specific use case and how you track materializations of asset partitions. You would need to implement the `check_if_all_deps_materialiScout
03/05/2024, 2:10 AMJerry Bao
03/05/2024, 2:11 AMScout
03/05/2024, 2:11 AMpython
from dagster import asset, repository, sensor, RunRequest, AssetKey, AssetMaterialization, AssetObservation
from dagster.core.storage.event_log.base import EventRecordsFilter
# Define your assets with explicit dependencies
@asset(required_resource_keys={"io_manager"})
def upstream_asset(context, partition_key):
# Your logic to produce the asset
pass
@asset(required_resource_keys={"io_manager"})
def downstream_asset(context, partition_key, upstream_asset):
# Your logic that depends on upstream_asset
pass
# Define a sensor that checks for materialization of dependent assets
@sensor(job=downstream_asset)
def downstream_asset_sensor(context):
partition_key = "2023-01-01" # Example partition key
upstream_asset_key = AssetKey("upstream_asset")
# Check if the upstream_asset with the given partition key has been materialized
has_materialization = context.instance.has_asset_key_materialization(
asset_key=upstream_asset_key,
partition=partition_key
)
if has_materialization:
yield RunRequest(
run_key=f"downstream_asset_{partition_key}",
run_config={"solids": {"downstream_asset": {"config": {"partition_key": partition_key}}}}
)
@repository
def my_repo():
return [upstream_asset, downstream_asset, downstream_asset_sensor]
In this example, upstream_asset
and downstream_asset
are defined with explicit dependencies, which will be reflected in the asset lineage. The downstream_asset_sensor
monitors the materialization of upstream_asset
and triggers the materialization of downstream_asset
when the condition is met.
Please note that this is a simplified exaScout
03/05/2024, 2:11 AM