I have an asset_sensor that watches for materializ...
# ask-ai
t
I have an asset_sensor that watches for materialization of asset_a, and should then trigger asset_b to run via a job. asset_a is a dbt asset. asset_b is a SQL query that should query the same partition as used in asset_a. How can I pass partition information from asset_a, to the sensor, and then to the job that runs asset_b?
s
To pass partition information from
asset_a
to
asset_b
through an
asset_sensor
, you can follow these steps: 1. Materialize
asset_a
with Partition Information
: Ensure that when
asset_a
is materialized, it includes partition information. This can be done by specifying the partition key when you materialize the asset. 2. Create an
asset_sensor
for
asset_a
: Define an
asset_sensor
that watches for the materialization of
asset_a
. When
asset_a
is materialized, the sensor will be triggered. 3. Access Partition Information in the Sensor: In the
asset_sensor
, you can access the partition information from the materialization event. The documentation provides an example of how to access the partition key from the run record within a sensor:
Copy code
python    @sensor(job=job2)    def job2_sensor(context):        run_records = context.instance.get_run_records(            filters=RunsFilter(                job_name="job1",                statuses=[DagsterRunStatus.SUCCESS]            )        )        for run_record in run_records:            partition_key = run_record.dagster_run.tags["dagster/partition"]            # Now you have the partition_key that you can use to trigger asset_b
4. Trigger
asset_b
with the Partition Key
: Use the partition key obtained in the sensor to configure the job that runs
asset_b
. You can pass this partition key as part of the
run_config
or through any other mechanism that
asset_b
requires to run with the correct partition context. 5. Define the Job for
asset_b
: Ensure that the job which runs
asset_b
is capable of accepting the partition information and using it to run the SQL query against the correct partition. Here is a conceptual example of how you might set up the sensor and job:
Copy code
python from dagster import asset_sensor, RunRequest, SensorExecutionContext  @asset_sensor(asset_key="asset_a", job=job_for_asset_b) def asset_a_sensor(context: SensorExecutionContext):     materialization_event = context.latest_materialization()     if materialization_event:         partition_key = materialization_event.partition         run_config = {"resources": {"my_db": {"config": {"partition": partition_key}}}}         yield RunRequest(run_key=context.cursor, run_config=run_config)
In this example,
job_for_asset_b
should be a job that is configured to run
asset_b
with the provided
run_config
. The
run_config
should be structured in a way that
asset_b
can use the partition information to query the correct partition. Please note that the exact implementation details may vary based on your specific setup and how
asset_a
and
asset_b
are defined and materialized. The provided example is a high-level guide and may need to be adjusted to fit your use case.
t
@Scout You're giving me outdated instructions, there is no such thing as a SensorEvaluationContext.
s
I apologize for the confusion. You are correct that
SensorEvaluationContext
is not the correct context to use with the latest versions of Dagster. Instead, you should use
SensorExecutionContext
when defining an asset sensor. Here's how you can pass partition information from
asset_a
to
asset_b
using an asset sensor: 1. Define an asset sensor: Create an asset sensor that triggers when
asset_a
is materialized. The sensor can access the partition information from the materialization event. 2. Extract partition information: In the sensor, extract the partition information from the event. This information is typically available in the event metadata. 3. Pass partition information to the job: Use the extracted partition information to create a
RunRequest
for the job that will run
asset_b
. You can pass the partition information as part of the
run_config
or tags. Here's a simplified example of how you might define the sensor:
Copy code
python from dagster import asset_sensor, RunRequest, AssetMaterialization  @asset_sensor(asset_key="asset_a", job=job_for_asset_b) def asset_a_sensor(context):     materialization_event = context.latest_materialization("asset_a")     if materialization_event:         # Assuming the partition is stored in the materialization metadata         partition = materialization_event.dagster_event.event_specific_data.materialization.partition         run_config = {"resources": {"my_resource": {"config": {"partition": partition}}}}         yield RunRequest(run_key=partition, run_config=run_config)
In this example,
job_for_asset_b
should be a job that is configured to run
asset_b
with the provided
run_config
. The
run_config
should be structured in a way that
asset_b
can use the partition information to query the correct partition. Please ensure that you are using the latest documentation and API references for Dagster, as the platform is actively developed and APIs may evolve.