Alex Hunsberger
09/04/2023, 8:26 AM@asset(partitions_def=HourlyPartitionsDefinition(start_date="2023-09-01-00:00"))
def my_hourly_asset(context):
dt_format_str = "%Y-%m-%d-%H:%M"
dt_range = context.partition_key_range
cur_dt = datetime.strptime(dt_range.start, dt_format_str)
end_dt = datetime.strptime(dt_range.end, dt_format_str)
while cur_dt <= end_dt:
<http://context.log.info|context.log.info>(f"Processing {cur_dt}")
partition = cur_dt.strftime(dt_format_str)
yield AssetMaterialization(
asset_key=f"my_hourly_asset-{partition}",
metadata={"parition.name": partition},
partition=partition)
cur_dt = cur_dt + timedelta(hours=1)
yield Output(None)
This raises an exception like the following when evaluating the Output:
File "/usr/local/lib/python3.7/site-packages/dagster/_core/storage/upath_io_manager.py", line 439, in handle_output
assert len(paths) == 1
But if I don't include the Output, or if I try simply returning a value or values, I get the following exception:
dagster._core.errors.dagsterstepoutputnotfounderror: core compute for op 'my_hourly_asset' did not return an output for non-optional output "result"
I've also tried to yield or return something like a string or a list of strings in the final Output, which also fails with the same error.
What is the correct way to do this in Dagster?Alex Hunsberger
09/04/2023, 3:03 PMjamie
09/05/2023, 2:05 PMAlex Hunsberger
09/06/2023, 1:53 AM