Chris Anderson
12/07/2022, 12:16 AM.collect()
and a dynamic op that's been turned into an asset from within another asset that has it as a dependency? For example, I have
@op(out=DynamicOut())
def observations(context) -> DynamicOutput[gpd.GeoDataFrame]:
# yield observations in chunks
This op gets turned into an asset with its own group later with AssetsDefinition.from_op
that's used across many jobs. In some jobs this dynamic behavior is beneficial and sought after, but in others i'd like to collect all the information before proceeding, like below
@asset(
ins={'observations': AssetIn('observations')},
outs={'collective_analysis': AssetOut()}
)
def collective_analysis(context, observations):
collected_obs = observations.collect()
# do analysis with all observations collected into one dataframe
Doing this code right now throws the following error and tries to split this collective_analysis
asset materialization dynamically according to the the original dynamic op:
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "collective_analysis"::AttributeError: 'GeoDataFrame' object has no attribute 'collect'
owen
12/07/2022, 12:33 AM@graph(out={"other_asset": GraphOut(), "collective_analysis": GraphOut()})
def observations_and_other_things():
obs = observations()
analysis = collective_analysis(obs.collect())
other_thing = combine_outputs(something_parallel(obs).collect())
return other_asset, collective_analysis
AssetsDefinition.from_graph(collective_analysis_and_other_asset)
It's possible that you could return obs
directly from that graph as a third asset output, but this isn't supported behavior, and I'm not confident at all that it wouldn't breakChris Anderson
12/07/2022, 12:52 AMdagster._core.errors.DagsterInvalidDefinitionError: Invalid dependencies: for node "observations" input "observations", the DagsterType "GeoDataFrame" does not support fanning in (MultiDependencyDefinition). Use the List type, since fanning in will result in a list.
owen
12/07/2022, 4:42 PMList[GeoDataFrame]
instead of GeoDataFrame