https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Muhammad Jarir Kanji

07/03/2023, 7:13 PM
How would I access the path for the latest materialization of an upstream asset from within the op of a downstream asset? The use case here is an ETL pipeline moving data to a warehouse by first staging it in S3 (the upstream asset) and then `COPY`ing it into the warehouse?
Copy code
@asset(io_manager_def=MyUPathIOManager(base_path="<s3://my-bucket/>")
def s3_asset(some_resource) -> pd.DataFrame:
    ...

@asset(non_argument_deps={"s3_asset"})
def warehouse_table() -> None:

    s3_path = ... # THIS IS WHAT I NEED HELP WITH

    warehouse_client.exeucte(f"COPY some_table from {s3_path}")
I suppose if I already know how the
MyUPathIOManager
constructs its paths, I could recreate the logic in
warehouse_table()
, but that doesn't really work for an arbitrary IO Manager or for switching between different IO managers because my path creation logic is dependent on the choice of the upstream IO manager.
🤖 1
j

jamie

07/03/2023, 7:19 PM
hey @Muhammad Jarir Kanji this is a bit of a tricky situation and you may need to weigh some trade offs to get the result you’re looking for. There isn’t any interface or other mechanism to require that IO manager factor out path creation logic, but a lot of IO managers do factor this out into another function. If you have a specific set of IO managers that you might use, you can see if they all factor out the path logic into a function with the same name, in which case you could call that function in the downstream asset. Otherwise, you may want to look into not using IO managers at all. Instead in
s3_asset
you would manually store the data and then in
warehouse_table
you would be able to recreate the path since you were the one who made it in the first place
a

Adam Bloom

07/03/2023, 8:12 PM
We have a similar use case and follow a pattern similar to what Jamie mentioned - we don’t rely on i/o managers. Furthermore, we only have one asset (the warehouse step) and the s3 file placement is just an intermediate part of that op. We always want them batched together and don’t have a use case for using the file in s3 outside of the warehouse, so no need for it to also be an asset
m

Muhammad Jarir Kanji

07/04/2023, 4:25 PM
@jamie and @Adam Bloom Thank you for the suggestions! Given that this is likely to be a common pattern, are there currently any plans to make materialization metadata (such as the path) available within downstream ops? For my part, I was able to put together the following, very 'hacky' solution based on this GitHub discussion around the output metadata access:
Copy code
def get_last_materialization_path(context: AssetExecutionContext, asset_key: AssetKey):
    events = context.instance.event_log_storage.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=asset_key,
            asset_partitions=None,
        ),
        limit=1,
        ascending=False,
    )

    if len(events) == 0:
        raise Exception(f"No asset materialization events found for {asset_key}")
    else:
        last_materialization = events[0]
        run_id = last_materialization.event_log_entry.run_id
        <http://context.log.info|context.log.info>(
            f"Found previous materialization of {asset_key} in run {run_id}"
        )

    materialization_path = (
        last_materialization
        .event_log_entry
        .dagster_event
        .event_specific_data
        .materialization
        .metadata['path']
        .value
    )

    return materialization_path

@asset(non_argument_deps={"s3_asset"})
def warehouse_table(context):
    upstream_asset_key = context.asset_key_for_input("s3_asset")
    s3_path = get_last_materialization_path(context, upstream_asset_key)
 
    ... # logic for COPYing data into warehouse
j

jamie

07/05/2023, 2:09 PM
for accessing materialization metadata downstream, yes that is on our roadmap