Hi all, I'm new to dagster and I'm experimenting ...
# ask-community
r
Hi all, I'm new to dagster and I'm experimenting with it for my application. I have some periodic refresh job I need to run, and I need to refresh multiple items, and need the graph to be built at runtime. I basically need to spawn multiple sub-jobs from a loop -- this way I'd like to be able to tell if a particular refresh job failed. So something like this, but it doesn't work:
Copy code
@asset
def refresh_item(item):
    ... some code

@asset
def refresh_catalogue():
    catalogue: list[MyItem] = load_catalogue()

    for item in catalogue:
        refresh_item(item)
I've not been able to find any usable examples of this online -- the one I found uses
pipelines
which appears to be deprecated. How would I do this in dagster?
z
Graph assets + dynamic graphs OR partitions, depending on whether the list you're fanning out over grows and shrinks or not
r
Thanks for the links! The dynamic graph link worked like a charm 😄, I followed the example on the page but I'm not sure how I should include
graph-asset
or
graph-multi-asset
. I tried replacing
@job
with them but the job wouldn't run. This is what I ended up with:
Copy code
@op
def refresh_item(item):
    ... some code

@op(out=DynamicOut())
def span_jobs():
    catalogue = load_catalogue()

    for i, specs in enumerate(catalogue):
        yield DynamicOutput(specs, mapping_key=str(i))

@job
def refresh_catalogue():
    jobs = span_jobs()
    _ = jobs.map(refresh_item)
z
If you want it to be an asset, then you can use @graph_asset, then either just materialize the asset, or make it into an asset job, which will run similarly to an op-based job
r
Sorry, what do you mean by materialize the asset?
I tried updating the code using asset definitions, does not appear to work
Copy code
@op
def refresh_item(item):
    ... some code

@op(out=DynamicOut())
def span_jobs():
    catalogue = load_catalogue()

    for i, specs in enumerate(catalogue):
        yield DynamicOutput(specs, mapping_key=str(i))

@graph_asset
def refresh_catalogue():
    jobs = span_jobs()
    _ = jobs.map(refresh_item)
    

defs = Definitions(
    assets=[refresh_catalogue],
)
If I replace
@graph_asset
with
@job
and modify defnitions as follows:
Copy code
defs = Definitions(
    jobs=[refresh_catalogue]
)
I can see it in the UI for jobs, but not assets
yeah I can't seem to get
@graph_asset
to work no matter what I do
idk what this library is doing... when I put
@graph_asset
decorator on, it's triggering all kinds of assert failures inside my library. like why is dagster calling my code on init
z
Is your graph asset just
Copy code
@graph_asset
def refresh_catalogue():
    jobs = span_jobs()
    _ = jobs.map(refresh_item)
? What are the errors you're getting
and where are you using the
refresh_catalogue
object?
Dagster executes anything marked with
@graph
,
@job
, or
@graph_asset
on init (not the underlying ops though), as those are indicators to dagster to compile a job or graph object out of the function. This can have unintended affects if you're doing vanilla python calls inside one of those definitions
r
Error I'm getting, where I return
jobs.map(refresh_item).collect()
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: @graph 'refresh_catalogue' returned problematic value of type <class 'dagster._core.definitions.composition.DynamicFanIn'>. Expected return value from invoked node or dict mapping output name to return values from invoked nodes
z
Ah yeah that's one oddity about graph assets, because it's an asset you have to return something from the graph
Dagster tries to detect errors like this early during load that will be problematic at execution
Reading between the lines of your examples it seems that you don't really care if it's modeled as an asset, so you can just use @graph or @job with dynamic graphs if you don't want to figure out the graph asset thing
r
Ok I got it to work from reading https://github.com/dagster-io/dagster/issues/15664, but it's like 🤦‍♂️. I have to create a dummy function with
@op
and return an empty dataframe:
Copy code
@op
def collector(stub):
    return pd.DataFrame()

@graph_asset
def refresh_catalogue():
    jobs = span_jobs()
    stub = jobs.map(refresh_item).collect()
    return collector(stub)
Yeah, I mean it's not super important that it's an asset, it would just be nice to have the visualization associated with assets
But it looks like it doesn't show the various sub-tasks even if it appears under the asset tabs anyway
z
Maybe a bit pedantic but it doesn't really have to be a fake dataframe, you could return an empty dict or something like that too. But yeah it's kinda weird having to return something if you didn't really plan ahead to rely on IO managers
r
Makes sense. Anyway, thanks for your help
🎉 1