https://dagster.io/ logo
#ask-community
Title
# ask-community
p

Peter Davidson

12/19/2022, 8:25 PM
I've been scouring the docs but still not sure on the right pattern for m workflow. Feels like a mix-mash of partitioned assets, and ops. Workflow: • Step 1 -> download some data, transform, store -> These could be monthly partitioned assets • Step 2 -> perform calculations using the asset data: ◦ This would be a configured job, for a specific partition ◦ Several other paramaters input to the calculations ◦ Usually several different scenarios run for a given month, using different parameters. It feels like Step 1 would fit nicely as assets with monthly partitions, But Step 2 the job with ops needs to be able to access the values from the assets in step 1, for a specific partition. Does this design make sense?
Copy code
@asset(partitions_def=MonthlyPartitionsDefinition(start_date='2018-02-01'))
def sales_data():
  """ Refreshed automatically every month """
  ...

@op(config_schema={'discount_rate': float, 'rep_date': str})
def sales_with_discount():
  # need to get sales data for a specific month here:
  ..

@graph
def graph_sales_discounted()
  return sales_with_discount

# This job might be run many times with different discount rate
job_discounts = graph_sales_discounted.to_job(config={'ops': {'sales_with_discount': {'config': { 'discount_rate': 0.3, 'rep_date': '2022-09-30'}}}})
a

Adam Ward

12/20/2022, 1:30 PM
I'm looking at doing something similar and, though I haven't gotten here yet, I plan to pass asset information into the Metadata and retrieve that from the upstream context. Have you explored whether you can pass the information you need that way?
p

Peter Davidson

12/20/2022, 1:38 PM
so far i haven't found a way to expose asset values to ops. The more I read / search through old posts the more it seems discouraged. Surprising to me, as this feels like a typical use case for parameterized models: • set up input data • perform some calculations using input data and a set of parameters (which may change for different scenarios) There is probably a workaround to load files if you know where specific asset materializations are stored -> but this woudl be hacky and I don't think you'd maintain lineage (graph) between the input data and the calculations.
Just tring to put an asset into an op yields this kind of error:
dagster._check.CheckError: Invariant failed. Description: All leaf nodes within graph 'graph_with_asset' must generate outputs which are mapped to outputs of the graph, and produce assets. The following leaf node(s) are non-asset producing ops: {'concat_only'}. This behavior is not currently supported because these ops are not required for the creation of the associated asset(s).
a

Adam Ward

12/20/2022, 2:00 PM
Hmm - lemme see if I can get test running. I'll check back in shortly
What version of Dagster are you using?
p

Peter Davidson

12/20/2022, 2:01 PM
1.1.7
👍🏻 1
The only other idea I had was to have downstream assets (instead of ops) which are partitioned by a parameter set id, as well as the MonthlyPartitionsDefinition of the upstream source data. but, not sure it works like that...
a

Adam Ward

12/20/2022, 3:01 PM
That's kinda what I was exploring. Assets are such a difficult way of thinking of these things for me. I can't tell if Dagster intends assets to be thought of as objects or instances (though I've seen them used both ways). 😕
I guess the object approach is done with multi-assets, but still.
p

Peter Davidson

12/20/2022, 3:04 PM
I think, objects. If assets were instances then each instance of an asset would be treated separately. But, as an experiment, I ran two different dagit instances locally (different terminal sessions & ports), and materlized the same asset but with a different parameter. The differnt instances didn't know that they weren't the same asset., i.e. instance A showed re-materialized after instance B was materialized -> this was using a common DB for run storage
a

Adam Ward

12/20/2022, 3:06 PM
Oh that's interesting. I have seen folks define assets with a one-to-one relationship to DB tables too and that seems more instance-like, at least in practice.
...and extremely tedious. lol
p

Peter Davidson

12/23/2022, 8:14 AM
hey, so I got something working with a custom IO Manager. Minimal POC is in my playground repo, job "downstream_job". • Upstream assets are multi-partitioned, monthly + static • Downstream are not partitioned For a downstream asset job I define an upstream partition key override, which gets picked up by the iomanager when loading input assets. Then the downstream knows to pick a specific partition of the upstream assets. I wanted downstream to have a different static partition but seems not supported in the config mapping -< not sure if I can work around this. https://github.com/pdavidsonFIA/dagster-poc/blob/9006f49d004c2313cba01647f670fd1c867018fc/dagster_poc/resources/__init__.py#L93 I did borrow from @Sean Lopp’s snow report for the asset factory, but didn't quite understand his logic in terms of selecting specific partitions: https://github.com/slopp/snowreport/blob/main/snowreport/assets/report_raw_json.py#L42
this was the error when trying to have downstream assets partitioned:
UserWarning: Error loading repository location dagster_pocdagster. check.CheckError Invariant failed. Description: Can't supply a ConfigMapping for 'config' when 'partitions_def' is supplied.
fyi @Terry Lines
t

Terry Lines

02/14/2023, 6:02 PM
@Peter Davidson Thanks for this. I hadn't looked at it until now. In my instance I am using Dagster's predefined UPathIOManager which has handling of partitioned assets built-in. I mention this because it's possibly a good template for your io_manager, albeit it loads everything and then you choose from the dictionary
6 Views