https://dagster.io/ logo
#ask-ai
Title
# ask-ai
r

R Lucas

03/01/2024, 8:28 AM
How to report materialization events only for missing or failing partitions ?
s

Scout

03/01/2024, 8:28 AM
To report materialization events only for missing or failing partitions, you can use the capabilities provided by Dagster's asset sensors and the asset observation and materialization APIs. Here's a step-by-step approach based on the documentation: 1. Identify Missing or Failing Partitions: You can use the
MultiAssetSensorEvaluationContext
to fetch the latest materialization records for the assets you are monitoring. This context object provides methods like
latest_materialization_records_by_key()
and
latest_materialization_records_by_partition()
to get the most recent materialization events for assets or partitions. By examining these records, you can determine which partitions are missing or have failed to materialize. 2. Report Materialization Events: Once you have identified the missing or failing partitions, you can create
AssetMaterialization
events for them. These events can be reported using the
asset_events
list, which is a part of the sensor evaluation context. This list can include materializations, observations, and asset check evaluations that the system will persist. 3. Use Asset Sensors: Define an asset sensor using the
@multi_asset_sensor
decorator to monitor the assets of interest. Within the sensor function, you can implement the logic to check for missing or failing partitions and report materialization events accordingly. Here is an example code snippet from the documentation that demonstrates how to use an asset sensor to check for materializations and potentially skip execution based on the observed materializations:
Copy code
python
@multi_asset_sensor(
    monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
    job=my_job,
)
def asset_a_and_b_sensor_with_skip_reason(context):
    asset_events = context.latest_materialization_records_by_key()
    if all(asset_events.values()):
        context.advance_all_cursors()
        return RunRequest()
    elif any(asset_events.values()):
        materialized_asset_key_strs = [
            key.to_user_string() for key, value in asset_events.items() if value
        ]
        not_materialized_asset_key_strs = [
            key.to_user_string() for key, value in asset_events.items() if not value
        ]
        return SkipReason(
            f"Observed materializations for {materialized_asset_key_strs}, "
            f"but not for {not_materialized_asset_key_strs}"
        )
In this example, the sensor checks if all monitored assets have been materialized. If not, it provides a skip reason indicating which assets have not been materialized. Please note that the exact implementation details may vary based on your specific use case and the structure of your assets and partitions. The provided example is a starting point, and you may need to adjust the lo
gic to fit your requirements for reporting only on missing or failing partitions.
2 Views