Aaron T
03/22/2023, 3:44 AMTimeWindowPartitionMapping
for a graph_asset
? Getting an [Errno 2] No such file or directory: 'dagster_home/storage/my_report_graph/{date}
. Looking at the storage too, I can see the run of my_report_graph
by the guid, unlike a regular asset, which is named.
@graph_asset(partitions_def=daily_partitions_def)
def my_report_graph():
...
return df
@asset(
partitions_def=daily_partitions_def,
ins={ "my_report_graph" : AssetIn(partition_mapping=TimeWindowPartitionMapping()) }
)
def my_time_window_asset():
...
# do work
sandy
03/22/2023, 3:52 PMAaron T
03/22/2023, 4:22 PM@graph_asset(partitions_def=daily_partitions_def)
def my_report_graph():
def _for_each_switch(switch, logicom_token:LogicomToken):
report_info = logicom_data.generate_logicom_report_op(logicom_token, switch)
report_id = logicom_data.get_report_id_op(report_info)
zip_filename = logicom_data.download_logicom_report_op(logicom_token, report_id)
# filenames = [zip_filename]
df = logicom_data.extract_csv_report_op(zip_filename)
df = logicom_data.clean_data_op(df)
return df
logicom_token = logicom_data.get_token_op()
switches = logicom_data.get_switches_op()
results = switches.map(lambda switch: _for_each_switch(switch, logicom_token))
return results
sandy
03/22/2023, 4:50 PMit could stem from a dynamic output op?ah - this is likely the issue. you'd need to do a `collect`first
Aaron T
03/22/2023, 4:52 PMcollect
before map
?sandy
03/22/2023, 4:53 PMmap
- i.e. a @graph_asset
can't return a dynamic output, so you need to collect
it down to a non-dynamic output before returning.
this likely requires adding another op to your graph_asset
that takes all the mapped outputs as input and returns a single non-mapped output.Aaron T
03/22/2023, 4:55 PMreturn collect_op(results.collect()
? If I try returning results.collect()
I get an invalid def error with a problematic returnsandy
03/22/2023, 4:56 PMAaron T
03/22/2023, 5:35 PMresults.collect()
is producing a None
type class. I basically just added an op that takes dfs:list[pd.DataFrame]
and did return collect_op(results.collect())
so, that must mean my _for_each_switch
loop doesn't have the proper return?
DagsterTypeCheckDidNotPass: Type check failed for step input "dfs" - expected type "[DataFrame]". Description: Value of type <class 'NoneType'> failed type check for Dagster type DataFrame, expected value to be of Python type pandas.core.frame.DataFrame
sandy
03/22/2023, 6:01 PMmy_time_window_asset
, it could be that collect_op
isn't returning a DataFrame?sandy
03/22/2023, 6:02 PMcollect_op
, it could be that _for_each_switch
isn't returning a DataFrameAaron T
03/22/2023, 6:03 PM_for_each_switch()
- https://dagster.slack.com/archives/C01U954MEER/p1679502133951639?thread_ts=1679456656.523949&cid=C01U954MEERAaron T
03/23/2023, 12:50 AM