Muhammad Jarir Kanji
07/03/2023, 7:13 PM@asset(io_manager_def=MyUPathIOManager(base_path="<s3://my-bucket/>")
def s3_asset(some_resource) -> pd.DataFrame:
...
@asset(non_argument_deps={"s3_asset"})
def warehouse_table() -> None:
s3_path = ... # THIS IS WHAT I NEED HELP WITH
warehouse_client.exeucte(f"COPY some_table from {s3_path}")
I suppose if I already know how the MyUPathIOManager
constructs its paths, I could recreate the logic in warehouse_table()
, but that doesn't really work for an arbitrary IO Manager or for switching between different IO managers because my path creation logic is dependent on the choice of the upstream IO manager.jamie
07/03/2023, 7:19 PMs3_asset
you would manually store the data and then in warehouse_table
you would be able to recreate the path since you were the one who made it in the first placeAdam Bloom
07/03/2023, 8:12 PMMuhammad Jarir Kanji
07/04/2023, 4:25 PMdef get_last_materialization_path(context: AssetExecutionContext, asset_key: AssetKey):
events = context.instance.event_log_storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=None,
),
limit=1,
ascending=False,
)
if len(events) == 0:
raise Exception(f"No asset materialization events found for {asset_key}")
else:
last_materialization = events[0]
run_id = last_materialization.event_log_entry.run_id
<http://context.log.info|context.log.info>(
f"Found previous materialization of {asset_key} in run {run_id}"
)
materialization_path = (
last_materialization
.event_log_entry
.dagster_event
.event_specific_data
.materialization
.metadata['path']
.value
)
return materialization_path
@asset(non_argument_deps={"s3_asset"})
def warehouse_table(context):
upstream_asset_key = context.asset_key_for_input("s3_asset")
s3_path = get_last_materialization_path(context, upstream_asset_key)
... # logic for COPYing data into warehouse
jamie
07/05/2023, 2:09 PM