https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Chris Anderson

12/07/2022, 12:16 AM
Is there a way to call
.collect()
and a dynamic op that's been turned into an asset from within another asset that has it as a dependency? For example, I have
Copy code
@op(out=DynamicOut())
def observations(context) -> DynamicOutput[gpd.GeoDataFrame]:
    # yield observations in chunks
This op gets turned into an asset with its own group later with
AssetsDefinition.from_op
that's used across many jobs. In some jobs this dynamic behavior is beneficial and sought after, but in others i'd like to collect all the information before proceeding, like below
Copy code
@asset(
    ins={'observations': AssetIn('observations')},
    outs={'collective_analysis': AssetOut()}
)
def collective_analysis(context, observations):
    collected_obs = observations.collect()
    # do analysis with all observations collected into one dataframe
Doing this code right now throws the following error and tries to split this
collective_analysis
asset materialization dynamically according to the the original dynamic op:
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "collective_analysis"::AttributeError: 'GeoDataFrame' object has no attribute 'collect'
o

owen

12/07/2022, 12:33 AM
hi @Chris Anderson! I'm actually slightly surprised that DynamicOutputs work with AssetsDefinition.from_op (at the very least, I would expect the default io_manager implementation to write each dynamic output for that asset to the same location, which would cause issues for downstream assets loading those outputs as they would continually be overwritten) -- do you have a custom IOManager setup to handle this? Or perhaps my priors are wrong on this. Regardless, the general pattern we recommend for combining assets + dynamic outputs is more along the lines of limiting the dynamic nature of the asset to the body of a single graph. So you could do something like :
Copy code
@graph(out={"other_asset": GraphOut(), "collective_analysis": GraphOut()})
def observations_and_other_things():
    obs = observations()
    analysis = collective_analysis(obs.collect())
    other_thing = combine_outputs(something_parallel(obs).collect())
    return other_asset, collective_analysis

AssetsDefinition.from_graph(collective_analysis_and_other_asset)
It's possible that you could return
obs
directly from that graph as a third asset output, but this isn't supported behavior, and I'm not confident at all that it wouldn't break
c

Chris Anderson

12/07/2022, 12:52 AM
Your first comment made me dig deeper, what i'm doing is taking an existing job that previously handled an expensive computation and porting it over to use dynamic outputs, so the entire pipeline already exists. It seems what you explained is happening where the same chunk is being passed to all copies of the remainder of the pipeline. So it "works" just not in an expected way, so that's a small wrench in my plans 😬. I may end up going forward with your example and restructure things in that case, so thank you for the help I appreciate it.
Are there any examples of having a dynamic graph return chunks of pandas dataframes? I'm currently stuck on rewriting a graph into one with an error of
dagster._core.errors.DagsterInvalidDefinitionError: Invalid dependencies: for node "observations" input "observations", the DagsterType "GeoDataFrame" does not support fanning in (MultiDependencyDefinition). Use the List type, since fanning in will result in a list.
o

owen

12/07/2022, 4:42 PM
I believe you just need to annotate the type of the thing that consumes observations as
List[GeoDataFrame]
instead of
GeoDataFrame
20 Views