Hi all, I'm trying to use the asset time-based par...
# ask-community
a
Hi all, I'm trying to use the asset time-based partition feature, and running into a few issues. I want to: • Be able to run partition materialization either as a single job (over a time range) or multiple jobs (each with a single date time), when launched from the UI • Store the materialized assets in an IO manager (I assume in different paths per partition) • Be able to tell Dagster which partitions materialized successfully or failed (by yielding something? events? or returning a complete list of the results that somehow maps to the partitions? and I would expect the result to be reflected in the UI in terms of which partitions show up as the color red or green for failed vs success) Here's what I have so far:
Copy code
@asset(partitions_def=HourlyPartitionsDefinition(start_date="2023-09-01-00:00"))
def my_hourly_asset(context):
    dt_format_str = "%Y-%m-%d-%H:%M"
    dt_range = context.partition_key_range
    cur_dt = datetime.strptime(dt_range.start, dt_format_str)
    end_dt = datetime.strptime(dt_range.end, dt_format_str)
    while cur_dt <= end_dt:
        <http://context.log.info|context.log.info>(f"Processing {cur_dt}")
        partition = cur_dt.strftime(dt_format_str)
        yield AssetMaterialization(
            asset_key=f"my_hourly_asset-{partition}",
            metadata={"parition.name": partition},
            partition=partition)
        cur_dt = cur_dt + timedelta(hours=1)
    yield Output(None)
This raises an exception like the following when evaluating the Output:
Copy code
File "/usr/local/lib/python3.7/site-packages/dagster/_core/storage/upath_io_manager.py", line 439, in handle_output
    assert len(paths) == 1
But if I don't include the Output, or if I try simply returning a value or values, I get the following exception:
Copy code
dagster._core.errors.dagsterstepoutputnotfounderror: core compute for op 'my_hourly_asset' did not return an output for non-optional output "result"
I've also tried to yield or return something like a string or a list of strings in the final Output, which also fails with the same error. What is the correct way to do this in Dagster?
After some searching I basically understand much of this functionality is not implemented yet. E.g. specifically the piece for returning outputs for each partition may come with https://github.com/dagster-io/dagster/pull/14621 and that's only for a specific IO Manager. And that doesn't solve the issue of returning some successes and some failures. So at this point I think my path forward is to only allow running one job per each partition. And if the user presses the button in the UI to run across a date range in a single run, I will raise a NotImplementedError in my asset. I hope this can be implemented generically across IO managers sometime soon, because the fact that the button exists in the UI to run across multiple partitions in a single run implies this functionality all comes standard.
j
hey @Alex Hunsberger - just want to check in, are you still having issues with the case when you only run a single partition? I’m checking on the status of that linked PR to see if we can get it moving along again. But until then i just want to make sure you aren’t having any other issues
a
Hi @jamie thanks for checking in. As long as I only run a single partition at a time, I don't have any issues