https://dagster.io/ logo
#ask-ai
Title
# ask-ai
t

Terry Lines

02/27/2024, 2:04 PM
how to get asset metadata from inside a schedule
s

Scout

02/27/2024, 2:04 PM
To access asset metadata from inside a schedule, you can use the context object that's provided to functions decorated with
@schedule
. The context object has an
instance
attribute that allows you to query the Dagster event log for asset materializations and their associated metadata. Here's an example based on the documentation:
Copy code
python from dagster import schedule, DagsterInstance, AssetKey  @schedule(cron_schedule="0 0 * * *", job=my_job) def my_schedule(context):     instance = context.instance     materialization_event = instance.get_latest_materialization_event(AssetKey(["my_asset"]))     metadata = materialization_event.asset_materialization.metadata     # You can now use the metadata as needed within the schedule
In this example,
my_job
would be the job that you have defined elsewhere in your code that materializes the asset. The
get_latest_materialization_event
method is used to retrieve the latest materialization event for the asset with the specified
AssetKey
. From this event, you can access the
metadata
attribute to get the metadata associated with the latest materialization of the asset. Please note that this example assumes that you have a job defined that materializes the asset in question. The
AssetKey
should match the key of the asset for which you want to retrieve metadata. The
cron_schedule
specifies when the schedule should run, and the
context.instance
provides access to the Dagster instance methods. Keep in mind that the exact implementation details may vary depending on your specific use case and the version of Dagster you are using. Always refer to the latest documentation for the most up-to-date information and APIs.
t

Terry Lines

02/27/2024, 2:07 PM
@Scout what about a partitioned asset
s

Scout

02/27/2024, 2:07 PM
For partitioned assets, you can use the
get_latest_materialization_events
method to retrieve the latest materialization events for each partition of the asset. Here's an example of how you might do this within a schedule:
Copy code
python from dagster import schedule, DagsterInstance, AssetKey  @schedule(cron_schedule="0 0 * * *", job=my_job) def my_schedule(context):     instance = context.instance     asset_key = AssetKey(["my_partitioned_asset"])     partitions = instance.get_partitions_for_asset(asset_key)          for partition in partitions:         materialization_event = instance.get_latest_materialization_event(asset_key, partition.name)         if materialization_event:             metadata = materialization_event.asset_materialization.metadata             # You can now use the metadata as needed within the schedule
In this example,
my_job
is the job that materializes the partitioned asset. The
get_partitions_for_asset
method is used to retrieve all partitions for the asset with the specified
AssetKey
. Then, for each partition,
get_latest_materialization_event
is called to get the latest materialization event for that partition. From there, you can access the
metadata
attribute to get the metadata associated with the latest materialization of each partition. Please ensure that you are using the correct methods and attributes as per the version of Dagster you are using, and consult the latest documentation for any updates or changes to the API.
t

Terry Lines

02/27/2024, 2:11 PM
@does get_latest_materialization_event support partitions
@Scout does get_latest_materialization_event support partitions
s

Scout

02/27/2024, 2:12 PM
As of the last update to my knowledge, the
get_latest_materialization_event
method does not directly support partitions. Instead, you should use the
latest_materialization_records_by_partition
method provided by the
MultiAssetSensorEvaluationContext
to get the latest materialization event for each partition of a partitioned asset. Here's an example of how you might use this within a sensor, which is similar to a schedule but for responding to asset materializations:
Copy code
python from dagster import multi_asset_sensor, AssetKey, RunRequest  @multi_asset_sensor(asset_keys=[AssetKey("my_partitioned_asset")], job=my_job) def my_sensor(context):     asset_key = AssetKey("my_partitioned_asset")     partition_materializations = context.latest_materialization_records_by_partition(asset_key)      for partition, event_log_entry in partition_materializations.items():         if event_log_entry:             metadata = event_log_entry.dagster_event.event_specific_data.materialization.metadata             # You can now use the metadata as needed within the sensor             # For example, you might decide to trigger a run based on the metadata             if some_condition_based_on_metadata(metadata):                 return RunRequest(run_key=partition, run_config={})
In this example,
my_job
is the job that you want to trigger based on the materialization of the partitioned asset. The
latest_materialization_records_by_partition
method is used to get a mapping of partition keys to the latest materialization event for that partition. Please note that this example is for a sensor, not a schedule. Schedules in Dagster are typically used for time-based job execution and do not have the same context capabilities as sensors to respond to asset materializations. If you need to access partitioned asset materializations within a schedule, you might need to use a different approach or consider whether a sensor would be more appropriate for your use case. Always refer to the latest documentation for the most up-to-date information and APIs, as the capabilities and methods available in Dagster can evolve over time.
3 Views