https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Rubén Briones

03/22/2023, 9:11 AM
There is any way of accesing the asset materialization output inside an
asset_sensor
?? I have this code that allows me to access the metadata, but I don't know how to access the output itself (the dataframe returned by the asset):
Copy code
@asset
def foo():
    return Output(
        pd.DataFrame({'data1': ['A', 'B'], 'data2': ['AA', 'BB']}),
        metadata={'meta1': 1, 'meta2': 2}
    )


@asset_sensor(asset_key=AssetKey("foo"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
    asset_materialization = asset_event.dagster_event.event_specific_data.materialization
    metadata = {entry.label: entry.entry_data for entry in asset_materialization.metadata_entries}
    ...
I want to return a
RunRequest
for each row in the dataframe returned by
foo
asset.
j

jamie

03/22/2023, 3:00 PM
hey @Rubén Briones accessing the output value of an asset in a sensor isn’t a use case i’ve seen before, and in general i don’t think it’s something we recommend/support. What are you trying to accomplish? there might be another way to achieve the same thing
r

Rubén Briones

03/22/2023, 4:16 PM
Hi Jamie, each day we receive a
pd.DataFrame
with a list of items (rows are items, and columns are their characteristics), and we have model that as an
asset
. And now we want to run an asset/job/op that for each row of the dataframe check if this product exists in our DDBB, and if not create it (insert new row in DDBB items table).
v

Vinnie

03/22/2023, 4:18 PM
Not to hijack but this seems like a textbook use case for dynamic partitions. I’d probably model each row as a dynamic partition and then you can kick off separate runs
j

jamie

03/22/2023, 4:49 PM
Ok based on that, here’s my idea for what you might do: 1. have a daily partitioned asset for the dataframe you receive each day. If you need to do work to fetch the dataframe, you could do that in the asset itself, but if the dataframe just “appears” in some external storage then you could model this as an observable source asset instead. 2. Have a downstream asset (also daily partitioned) that depends on the asset from 1. This asset will do the analysis, create DB entries, etc. 3. Use the asset reconciliation sensor to monitor both of these assets. Any time the asset from 1 is updated, it will automatically kick off a run of the asset from 2 relevant docs: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions https://docs.dagster.io/concepts/partitions-schedules-sensors/asset-sensors#asset-reconciliation-sensors https://docs.dagster.io/concepts/assets/asset-observations#observable-source-assets
6 Views