https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Maksym Domariev

07/29/2022, 5:04 AM
Hi, please help me with confusion... https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs I'm in a graph section in the docs, reading about graph, and yet , every code example is about job Does Dynamic Graph supports @graph annotation or I should work with job?
o

owen

07/29/2022, 11:59 PM
hi @Maksym Domariev! A job is just a graph with resources defined. If you're not using resources, then for the most part,
Copy code
@graph
def my_graph():
    # some code
will function identically to
Copy code
@job
def my_job():
    # some code
So dynamic outputs will work with the @graph annotation in the same way as in the @job examples
m

Maksym Domariev

07/30/2022, 12:05 AM
thanks, I think you have a bug This doesn't work : @op(required_resource_keys={"druid_db_client"}, out=DynamicOut()) def load_threads(context, timeline): yield some_heavy_request(timeline) @graph(out={"pages": GraphOut()}) def load_threads_graph(timeline): return load_threads(timeline).collect() @job(config=my_offset_partitioned_config, resource_defs=resource_defs) def executute_timeseries_query(): load_threads_graph(get_query_timeframe())
but this works : @job(config=my_offset_partitioned_config, resource_defs=resource_defs) def executute_timeseries_query_job(): query_threads(load_threads(get_query_timeframe()).collect())
o

owen

07/30/2022, 12:08 AM
if you replace
Copy code
@graph(out={"pages": GraphOut()})
def load_threads_graph(timeline):
    return load_threads(timeline).collect()
with
Copy code
@graph
def load_threads_graph(timeline):
    return load_threads(timeline).collect()
I think your original example would work (see thread below for an explanation)
m

Maksym Domariev

07/30/2022, 5:31 AM
@graph
def load_threads_graph(timeline):
return load_threads(timeline).collect()
gives me the same error
o

owen

08/01/2022, 9:48 PM
@Maksym Domariev Ah I see what you're talking about now, sorry about the confusion. Right now, it seems like we don't allow the return value of a graph (or job) to be the the result of a
.collect()
call. I filed an issue here: https://github.com/dagster-io/dagster/issues/9139. If you're not doing anything with the output once you call collect(), there's actually no need to make the call (collect() just will bundle all the outputs into a list that you can pass into a downstream op), so you could consider just not returning anything from your graph, i.e.
Copy code
@graph
def load_threads_graph(timeline):
    load_threads(timeline)
this is definitely confusing and unexpected behavior
anyway, let me know if you run into issues
2 Views