https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Aaron T

03/22/2023, 3:44 AM
Also, is there any way to have a
TimeWindowPartitionMapping
for a
graph_asset
? Getting an
[Errno 2] No such file or directory: 'dagster_home/storage/my_report_graph/{date}
. Looking at the storage too, I can see the run of
my_report_graph
by the guid, unlike a regular asset, which is named.
Copy code
@graph_asset(partitions_def=daily_partitions_def)
def my_report_graph():
    ...
    return df

@asset(
    partitions_def=daily_partitions_def,
    ins={ "my_report_graph" : AssetIn(partition_mapping=TimeWindowPartitionMapping()) }
)
def my_time_window_asset():
    ...
    # do work
s

sandy

03/22/2023, 3:52 PM
I would expect this to work would you be able to provide a full code snippet that triggers the unexpected behavior? is the above enough to do it?
a

Aaron T

03/22/2023, 4:22 PM
@sandy sure - it could stem from a dynamic output op? Maybe I have a bad return value? - Here is the full graph_asset
Copy code
@graph_asset(partitions_def=daily_partitions_def)
def my_report_graph():
    def _for_each_switch(switch, logicom_token:LogicomToken):
        report_info = logicom_data.generate_logicom_report_op(logicom_token, switch)
        report_id = logicom_data.get_report_id_op(report_info)
        zip_filename = logicom_data.download_logicom_report_op(logicom_token, report_id)
        # filenames = [zip_filename]
        df = logicom_data.extract_csv_report_op(zip_filename)
        df = logicom_data.clean_data_op(df)
        return df

    logicom_token = logicom_data.get_token_op()
    switches = logicom_data.get_switches_op()
    results = switches.map(lambda switch: _for_each_switch(switch, logicom_token))
    return results
s

sandy

03/22/2023, 4:50 PM
it could stem from a dynamic output op?
ah - this is likely the issue. you'd need to do a `collect`first
a

Aaron T

03/22/2023, 4:52 PM
collect
before
map
?
s

sandy

03/22/2023, 4:53 PM
after
map
- i.e. a
@graph_asset
can't return a dynamic output, so you need to
collect
it down to a non-dynamic output before returning. this likely requires adding another op to your
graph_asset
that takes all the mapped outputs as input and returns a single non-mapped output.
a

Aaron T

03/22/2023, 4:55 PM
I see, so
return collect_op(results.collect()
? If I try returning
results.collect()
I get an invalid def error with a problematic return
s

sandy

03/22/2023, 4:56 PM
exactly
a

Aaron T

03/22/2023, 5:35 PM
my
results.collect()
is producing a
None
type class. I basically just added an op that takes
dfs:list[pd.DataFrame]
and did
return collect_op(results.collect())
so, that must mean my
_for_each_switch
loop doesn't have the proper return?
Copy code
DagsterTypeCheckDidNotPass: Type check failed for step input "dfs" - expected type "[DataFrame]". Description: Value of type <class 'NoneType'> failed type check for Dagster type DataFrame, expected value to be of Python type pandas.core.frame.DataFrame
s

sandy

03/22/2023, 6:01 PM
what step is that error occurring in? if it's the step for
my_time_window_asset
, it could be that
collect_op
isn't returning a DataFrame?
if it's the step for
collect_op
, it could be that
_for_each_switch
isn't returning a DataFrame
a

Aaron T

03/22/2023, 6:03 PM
The later is more likely, here is how I am returning my supposed DataFrame from the
_for_each_switch()
- https://dagster.slack.com/archives/C01U954MEER/p1679502133951639?thread_ts=1679456656.523949&amp;cid=C01U954MEER
In the end, I persisted, and found a missing return statement from an op... I appreciate the time you took today in order to help me. Thank you.
4 Views