ron burgundy
02/28/2024, 10:12 PM@asset
def refresh_item(item):
... some code
@asset
def refresh_catalogue():
catalogue: list[MyItem] = load_catalogue()
for item in catalogue:
refresh_item(item)
I've not been able to find any usable examples of this online -- the one I found uses pipelines
which appears to be deprecated.
How would I do this in dagster?Zach
02/28/2024, 10:15 PMron burgundy
02/28/2024, 10:48 PMgraph-asset
or graph-multi-asset
. I tried replacing @job
with them but the job wouldn't run. This is what I ended up with:
@op
def refresh_item(item):
... some code
@op(out=DynamicOut())
def span_jobs():
catalogue = load_catalogue()
for i, specs in enumerate(catalogue):
yield DynamicOutput(specs, mapping_key=str(i))
@job
def refresh_catalogue():
jobs = span_jobs()
_ = jobs.map(refresh_item)
Zach
02/28/2024, 10:51 PMron burgundy
02/28/2024, 10:57 PMron burgundy
02/28/2024, 11:01 PM@op
def refresh_item(item):
... some code
@op(out=DynamicOut())
def span_jobs():
catalogue = load_catalogue()
for i, specs in enumerate(catalogue):
yield DynamicOutput(specs, mapping_key=str(i))
@graph_asset
def refresh_catalogue():
jobs = span_jobs()
_ = jobs.map(refresh_item)
defs = Definitions(
assets=[refresh_catalogue],
)
ron burgundy
02/28/2024, 11:04 PM@graph_asset
with @job
and modify defnitions as follows:
defs = Definitions(
jobs=[refresh_catalogue]
)
I can see it in the UI for jobs, but not assetsron burgundy
02/28/2024, 11:17 PM@graph_asset
to work no matter what I doron burgundy
02/28/2024, 11:21 PM@graph_asset
decorator on, it's triggering all kinds of assert failures inside my library. like why is dagster calling my code on initZach
02/28/2024, 11:24 PM@graph_asset
def refresh_catalogue():
jobs = span_jobs()
_ = jobs.map(refresh_item)
? What are the errors you're gettingZach
02/28/2024, 11:24 PMrefresh_catalogue
object?Zach
02/28/2024, 11:25 PM@graph
, @job
, or @graph_asset
on init (not the underlying ops though), as those are indicators to dagster to compile a job or graph object out of the function. This can have unintended affects if you're doing vanilla python calls inside one of those definitionsron burgundy
02/28/2024, 11:32 PMjobs.map(refresh_item).collect()
dagster._core.errors.DagsterInvalidDefinitionError: @graph 'refresh_catalogue' returned problematic value of type <class 'dagster._core.definitions.composition.DynamicFanIn'>. Expected return value from invoked node or dict mapping output name to return values from invoked nodes
Zach
02/28/2024, 11:32 PMZach
02/28/2024, 11:33 PMZach
02/28/2024, 11:34 PMron burgundy
02/28/2024, 11:35 PM@op
and return an empty dataframe:
@op
def collector(stub):
return pd.DataFrame()
@graph_asset
def refresh_catalogue():
jobs = span_jobs()
stub = jobs.map(refresh_item).collect()
return collector(stub)
ron burgundy
02/28/2024, 11:37 PMron burgundy
02/28/2024, 11:38 PMZach
02/28/2024, 11:39 PMron burgundy
02/28/2024, 11:39 PM