Mykola Palamarchuk
03/30/2022, 7:39 PM@op(
out={
'time': Out(),
'data': Out(asset_key=AssetKey("my_snapshot"))
}
)
def get_snapshot():
now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H-%i-%s")
yield Output(now, output_name="time")
yield Output(get_remote_data(), output_name="data")
Is there a way to add now
as a partition value for my_snapshot
asset?
And the second question, My job looks like this:
snapshot_timestamp, data = get_snapshot()
processed_data = process_snapshot(data)
store_data(snapshot_timestamp, processed_data) # does not return anything actually
prepare_diff(snapshot_timestamp)
I want the last step to start only after the third one is complete, but it does not depend on its output explicitly as it uses different data access method. What could be the correct unambiguous way to do that? Should I fake the output in the third op and depend on it?owen
03/30/2022, 8:11 PM'data': Out(asset_key=AssetKey("my_snapshot"), asset_partitions={datetime.datetime.utcnow().strftime("%Y-%m-%d %H-%i-%s")})
. It's somewhat of a sketch way of doing it, as it might have some weird behavior if you start this job right as the day changes (or maybe it would work fine). The "nicer" way of doing it would probably be to make the entire job partitioned + manually yield an AssetMaterialization event (instead of including the asset key on the out) with the job's partition as the asset's partition (and you could skip making the job partitioned if you don't care about backfills and the like).owen
03/30/2022, 8:11 PMMykola Palamarchuk
03/31/2022, 5:48 AMowen
03/31/2022, 11:11 PMowen
03/31/2022, 11:12 PMMykola Palamarchuk
04/01/2022, 4:18 AMowen
04/01/2022, 6:14 PMcontext.instance.get_run_stats(some_run_id)
. This returns a PipelineRunStatsSnapshot , which will include the launch time of the run