Hi, I'm trying to figure out how to do something l...
# ask-ai
j
Hi, I'm trying to figure out how to do something like this using a sensor, but consuming the filename from an asset, rather than an op. How do I pass the filename to asset1? I couldn't find the answer in the docs.
Copy code
@asset
def asset1(context):
   filename = context.op_config["filename"]
   # materializa with the filename

@asset
def asset2(asset1):
   # materialize 2nd asset based on asset1

asset_job = define_asset_job("asset_job", "*")


@sensor(job=asset_job)
def materializes_asset_sensor():
    yield RunRequest(
         run_key=filename,
                 run_config={
                     "ops": {"import_file": {"config": {"filename": filename}}}
                 },
)
s
In the example you've provided, you're looking to create a sensor that triggers a job based on a filename that is associated with an asset. However, the code snippet seems to be mixing concepts from ops and assets, which are different abstractions in Dagster. To clarify, assets in Dagster represent a unit of data that your system produces or consumes, and they can be materialized by jobs or ops. Sensors, on the other hand, are a way to trigger jobs based on external events, such as the materialization of an asset. If you want to trigger a job based on the materialization of an asset and pass the filename from that materialization to another asset, you would typically use an asset sensor. An asset sensor checks for new
AssetMaterialization
events for a particular asset key and can be used to kick off a job that computes downstream assets. Here's a simplified example based on the documentation that demonstrates how to define an asset sensor that triggers a job and passes the filename from an asset materialization:
Copy code
python
from dagster import AssetKey, EventLogEntry, RunRequest, SensorEvaluationContext, asset_sensor

@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
    assert asset_event.dagster_event and asset_event.dagster_event.asset_key
    filename = asset_event.dagster_event.event_specific_data.materialization.metadata_entries["filename"].entry_data.text
    yield RunRequest(
        run_key=context.cursor,
        run_config={
            "ops": {
                "import_file": {
                    "config": {"filename": filename}
                }
            }
        },
    )
In this example,
my_asset_sensor
is an asset sensor that listens for materializations of the asset with key
my_table
. When a materialization occurs, it extracts the filename from the metadata of the materialization event and uses it to configure the
import_file
op within the job
my_job
. Please note that the actual implementation will depend on how you've structured your assets and jobs, and how you're storing the filename in the materialization metadata. The example above assumes that the filename is stored in the metadata of the
AssetMaterialization
event under the key "filename". If you need to test your asset sensor, the documentation also provides guidance on constructing a context object using a test instance that can log
AssetMaterialization
events, and then directly invoking your asset sensor with that context to evaluate the results. Remember to replace
my_job
with the actual job you want to trigger and adjust the metadata access according to how you store the filename in the materialization event. Configure IO manager in sensor button Thank you for your feedback!
j
How do I pass a value from a sensor to an asset job?