https://dagster.io/ logo
#ask-community
Title
# ask-community
g

Guillaume Latour

04/17/2023, 2:59 PM
Hey, I have started reading about dagster last week and I am in the process of learning the tool, fooling around with the available features and writing a POC to show a demo to my team. To do so I've prepared a small use case: 1. call to an api (subscribing some time consuming jobs) that is returning the ids of those jobs a. at some point those jobs are marked as "DONE" on the server 2. download the data (multiple 2gb json) 3. convert the data (parquet) After reading the core concepts I guess that I want • a partitioned software defined asset because the final dataset will be growing indefinitely and I want to be able to schedule/back fill new/old runs, • a graph asset because some of the steps do not directly generate asset (e.g. the call to the api for job subscription) • a sensor that is looking for the jobs to be marked as DONE by the server So far I have a
TimeWindowPartitionsDefinition
and a
PartitionedParquetIOManager
. Now I am writing the logic to all the api and I must admit that I am a bit lost, I find myself re-reading the documentation again and again without finding a solution to implement this.
Copy code
@op(out={'report': Out(io_manager_key="parquet_io_manager")})
def do_stuff(ids: List[int]) -> pd.DataFrame:
    return pd.DataFrame({'dt_from': [], 'count': []})

@op
def report_submit(context) -> List[int]:
    start_partition, end_partition = context.asset_partitions_time_window_for_output()
    # call the api, treat the response and extract the job ids
    return [1,2,3]

@graph_asset(partitions_def=six_hours_partitions)
def report() -> pd.DataFrame:
    ids = report_submit()
    return do_stuff(ids)
• I get an error saying that the
report_submit
op is not in a partition context. How could I solve that? I saw on the documentation that some non- asset related op should use a partition config object. Am I really not in a partition context already? • I think that I should use a
sensor
for step 1.a. But if I do so, I feel like I am stepping out of the asset..
o

owen

04/17/2023, 8:56 PM
hi @Guillaume Latour! This is a bit of a rough edge, but you're hitting this issue because you're calling
_asset__partitions_time_window_for_output
in a context where
report_submit
does not have an output that produces an asset (the error message is pretty unclear here). it's just a helper op . Instead, you could call
context.partition_time_window
, which should work in the same way, but does not care about the relationship between output and asset.
g

Guillaume Latour

04/18/2023, 5:14 AM
thank you very much for your answer, I guess that I've missed this method, but I will definitely remember it! Now concerning the design of the "waiting" step, what would be the dagster way? Because I need the jobs to be finished before the continuation of the pipeline. So the easy way of implementing it would be to call the status of the job in a loop before downloading the files. After reading about the sensors, I am thinking that it might be an application for it. But this sensor will be created on the fly, looking only the ids of the jobs generated in the
report_submit
op. Also, I want to be able to relaunch this step if something goes wrong during the download and I am unsure the sensor will allow that. Do you have any leads on how I may be able to do that?
🌈 1
o

owen

04/18/2023, 4:57 PM
The dagster way in this case would just be the polling style loop (inside the op/asset) that you're proposing. A sensor is functionally similar (it's just a function that gets called in a loop), but it lacks the same orchestration capabilities of an op (e.g. retries).
g

Gustavo Carvalho

05/13/2023, 4:58 AM
Hi @owen! I am trying to do the same thing that @Guillaume Latour did, i.e., using the information of the partition being materialized in one
op
of my
graph_asset
. I can make it working using
context.partition_time_window
, but it is not compatible with the Single Run Backfills feature , as it expects the
asset
to be defined in terms of
asset_partitions_time_window
or similar functions. Do you have a workaround for this case?
o

owen

05/15/2023, 4:04 PM
hi @Gustavo Carvalho! hm I see what you mean -- would you mind creating a github issue for this? In the meantime, one workaround might be to just represent that op's output as an asset itself (so you'd have a graph multi asset), but I get that that isn't an ideal solution
g

Gustavo Carvalho

05/16/2023, 1:31 PM
Hm, the graph multi asset can work. In my actual use case, the Op output is the result of an Extract step in the ETL process, so having it as an Asset can actually be a good side effect
2 Views