Hey team! I have an op like this: ```@op( out=...
# ask-community
m
Hey team! I have an op like this:
Copy code
@op(
    out={
        'time': Out(),
        'data': Out(asset_key=AssetKey("my_snapshot"))
    }
)
def get_snapshot():
    now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H-%i-%s")
    yield Output(now, output_name="time")
    yield Output(get_remote_data(), output_name="data")
Is there a way to add
now
as a partition value for
my_snapshot
asset? And the second question, My job looks like this:
Copy code
snapshot_timestamp, data = get_snapshot()
processed_data = process_snapshot(data)
store_data(snapshot_timestamp, processed_data) # does not return anything actually
prepare_diff(snapshot_timestamp)
I want the last step to start only after the third one is complete, but it does not depend on its output explicitly as it uses different data access method. What could be the correct unambiguous way to do that? Should I fake the output in the third op and depend on it?
o
hi @Mykola Palamarchuk! for question 1, there's an asset_partitions argument for Outs, so you could have
'data': Out(asset_key=AssetKey("my_snapshot"), asset_partitions={datetime.datetime.utcnow().strftime("%Y-%m-%d %H-%i-%s")})
. It's somewhat of a sketch way of doing it, as it might have some weird behavior if you start this job right as the day changes (or maybe it would work fine). The "nicer" way of doing it would probably be to make the entire job partitioned + manually yield an AssetMaterialization event (instead of including the asset key on the out) with the job's partition as the asset's partition (and you could skip making the job partitioned if you don't care about backfills and the like).
for question 2, sounds like you want order-based dependencies
m
Thanks for your answer @owen. I'm trying to find some elegant solution to the common pipeline case: collecting and processing data snapshots. Some observations from my side: • standard partitioned jobs solution does not work well here, as a snapshots collection may be triggered by hand or some sensor • timestamp as an op config parameter is a dangerous thing here as it should correspond to the time when the snapshot was collected • I'd like to have a possibility to re-execute snapshot processing (with k8s launcher), so i use io_manager to store snapshots on s3. But that produces annoying boilerplate when you have to store two outputs (and one of them is just a timestamp) I'm curious about the possibility to attach the timestamp to the run context and use it from there as a reference.
o
@Mykola Palamarchuk ah I see -- thanks for the extra context. For your case, I'd recommend removing the asset_key parameter on your Out(), and instead just yielding an AssetMaterialization manually (where you can have full control over what the partition key will be).
can you say a bit more about the re-execution bit?
m
Hi @owen ! I'm trying to adopt Dagster for our company usecases. So I'm checking its features and try to see if they have any use. I was trying to check the experimental Asset Lineage feature with sticking AssetKey to the output. I can give up on that for the moment and get back to it later. Now few words about re-execution: The pipeline basically consists of 3 steps: grab the raw data snapshot (simple but time-consuming), process it and load to a data warehouse. I'd like to have the possibility to restart the pipeline from the second or third step as we may change processing/loading algorithms. That mean I have to store results of the first and second steps in a persistent storage (AWS S3 in my case). Everything is good except the "snapshot timestamp". I feel like it has to be integrated into the pipeline metainformation somehow (it is basically a "partition identifier" in my understanding). Another interesting problem: using an io_manager to save data snapshots. Usually you stick to the run id for the name of pickled object. But that doesn't make a lot of sense here, as the timestamp is a pretty good identifier by itself. I'm still considering available options there. Not a big deal though. I can get back to it later after better understanding of Dagster internals.
o
gotcha -- to be transparent, we're moving away from that original asset lineage feature, in favor of Software-Defined Asssets, but neither of these interfaces currently allow you to generate a partition key at runtime (they must be specified before the run launches for SDAs). This is something we hope to relax in the future for software-defined assets. I see what you mean now about timestamps/re-execution. For the other bit, one thing you can do to translate run_id to a timestamp of the run is to query your DagsterInstance for that information. That would look something like
context.instance.get_run_stats(some_run_id)
. This returns a PipelineRunStatsSnapshot , which will include the launch time of the run