Rohin
05/15/2023, 4:07 PMjamie
05/15/2023, 4:44 PMRohin
05/15/2023, 5:00 PMjamie
05/15/2023, 6:43 PM@op
def materialization_event_times(context):
context.instance.get_latest_materialization_event(asset_key=AssetKey("foo)).timestamp
to get the time when an asset was materialized. Checking if an asset is currently being materialized is a bit more complicated and requires importing some non-public apis
from dagster._core.storage.pipeline_run import IN_PROGRESS_RUN_STATUSES, RunsFilter
@op
def current_materializations(context):
event = context.instance.get_event_records(event_records_filter=EventRecordsFilter(event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, asset_key=AssetKey("foo")), limit=1)
in_progress = context.instance.get_runs(
filters=RunsFilter(
run_ids=[
event[0].event_log_entry.run_id
],
statuses=IN_PROGRESS_RUN_STATUSES,
)
)
# in_progress is a bool of where the materialization run is currently executing
Rohin
05/16/2023, 4:42 PM