Peter Lim
02/29/2024, 4:55 PMScout
02/29/2024, 4:55 PMCodeLocationSelector
and JobSelector
identifiers to specify the code location and job you want to monitor. Here's how you can do it according to the documentation: 1. Use CodeLocationSelector
to monitor jobs in a different code location. For example, if you want to monitor any job in a code location named "defs" when it succeeds, you can define a sensor like this: python from dagster import run_status_sensor, DagsterRunStatus, CodeLocationSelector @run_status_sensor( monitored_jobs=[CodeLocationSelector(location_name="defs")], run_status=DagsterRunStatus.SUCCESS, ) def code_location_a_sensor(): # when any job in code_location_a succeeds, this sensor will trigger send_slack_alert()
2. Use JobSelector
if you want to monitor a specific job in another code location. For instance, to monitor a job named "data_update" in a repository named "code_location_a" within the "defs" code location, you can define a sensor like this: python from dagster import run_failure_sensor, JobSelector @run_failure_sensor( monitored_jobs=[ JobSelector( location_name="defs", repository_name="code_location_a", job_name="data_update", ) ], ) def code_location_a_data_update_failure_sensor(): # when the data_update job in code_location_a fails, this sensor will trigger send_slack_alert()
3. If you want to monitor every job in your Dagster instance, you can set monitor_all_code_locations=True
on the sensor decorator. However, note that monitor_all_code_locations
cannot be used along with jobs specified via monitored_jobs
. python from dagster import run_status_sensor, DagsterRunStatus @run_status_sensor( monitor_all_code_locations=True, run_status=DagsterRunStatus.SUCCESS, ) def sensor_monitor_all_code_locations(): # when any job in the Dagster instance succeeds, this sensor will trigger send_slack_alert()
Remember to replace send_slack_alert()
with your actual alerting or notification logic. This is just a placeholder to indicate where you would put the code that should run when the sensor is triggered. These examples are based on the documentation and should guide you in setting up sensors to monitor jobs across different code locations in Dagster.Peter Lim
02/29/2024, 6:51 PMScout
02/29/2024, 6:51 PMasset_sensor
. An asset_sensor
is designed to trigger a job run when there is a materialization for a specific asset or a selection of assets. Here's how you can define an asset_sensor
that targets an `asset_selection`: python from dagster import asset_sensor, RunRequest, SensorEvaluationContext, AssetKey @asset_sensor(asset_key=AssetKey("my_asset"), job=my_job) def my_asset_sensor(context: SensorEvaluationContext, asset_event): # Logic to determine whether to trigger a job based on the asset_event # For example, you could check the asset_event timestamp, metadata, etc. should_trigger = True # Replace with your condition if should_trigger: yield RunRequest( run_key=asset_event.event_log_entry_id, run_config={ # Define the run configuration here }, )
In the example above, my_job
is the job that will be triggered when the sensor detects a materialization event for the asset with the key my_asset
. The RunRequest
is yielded if the condition should_trigger
is met, which should be replaced with your own logic to determine when to trigger the job. If you want to target an asset selection, you can use the asset_selection
parameter on the sensor decorator to specify a selection of assets. Here's an example: python from dagster import sensor, AssetSelection, RunRequest @sensor(asset_selection=AssetSelection.keys("asset1", "asset2")) def my_asset_selection_sensor(context): # Logic to determine whether to trigger a job based on the asset selection should_trigger = True # Replace with your condition if should_trigger: yield RunRequest( run_key="unique_run_key", # Replace with a unique identifier for the run run_config={ # Define the run configuration here }, )
In this example, the sensor is configured to monitor a selection of assets ("asset1"
and "asset2"
). When the sensor evaluation logic determines that a job should be triggered, it yields a RunRequest
with a unique run_key
and the necessary run_config
. Please note that the asset_selection
parameter is used to specify which assets the sensor should monitor, and the RunRequest
is used to trigger a job run. The job triggered by the RunRequest
should be configured to materialize the desired assets based on the sensor's evaluation.Peter Lim
02/29/2024, 7:10 PMScout
02/29/2024, 7:10 PMrun_status_sensor
. The run_status_sensor
is designed to react to the status of a job run and can be used to trigger another job or perform actions like sending alerts. It does not have built-in functionality to materialize assets directly. However, there was a discussion in a GitHub issue about enabling a multi_asset_sensor
to submit runs for an asset selection instead of a job, which suggests that there might be interest in such a feature. This feature would allow a sensor to trigger the materialization of a specific set of assets. If you want to materialize a specific set of assets in response to a job run status, you would typically create a new job that materializes those assets and then use the run_status_sensor
to trigger that job. Here's a conceptual example of how you might set that up: python from dagster import run_status_sensor, DagsterRunStatus, RunRequest # Assume you have a job that materializes a specific set of assets asset_materialization_job = ... @run_status_sensor( run_status=DagsterRunStatus.SUCCESS, request_job=asset_materialization_job, ) def asset_materialization_sensor(context): # Check if the triggering job is not the asset materialization job itself # to prevent an infinite loop of job triggers if context.dagster_run.job_name != asset_materialization_job.name: # Define the run configuration for the asset materialization job run_config = { # Configuration that specifies which assets to materialize } return RunRequest(run_key=None, run_config=run_config) else: # Skip triggering if it's the asset materialization job return None
In this example, when a job run succeeds, the sensor checks if the run is not from the asset_materialization_job
itself. If it's not, the sensor triggers the asset_materialization_job
with the specified run configuration that materializes the desired assets. Please note that the ability to trigger materialization of a selection of assets directly from a run_status_sensor
without involving a job would require a feature that is not currently available in Dagster as of my last update. For the most current capabilities and potential new features, please refer to the latest Dagster documentation or the Dagster GitHub repository.Peter Lim
02/29/2024, 7:20 PMScout
02/29/2024, 7:20 PMdagster
module.
python
from dagster import sensor, RunRequest
2. Define Your Sensor: Use the @sensor
decorator to define your sensor function. The function can optionally take a context
parameter, which is an instance of SensorEvaluationContext
. This context provides methods and properties to access information about the Dagster instance and the sensor's evaluation.
python
@sensor(job=my_job)
def my_sensor(context):
# Your sensor logic here
3. Implement Sensor Logic: Inside the sensor function, implement the logic that determines when to trigger a job run. This could be based on time, external events, changes in a database, new files in a directory, or any other condition relevant to your use case.
4. Yield RunRequest: If the conditions are met, yield a RunRequest
object to trigger a job run. The RunRequest
can include a run_key
to uniquely identify the run and run_config
to provide configuration for the job.
python
if should_trigger_run: # Replace with your condition
yield RunRequest(
run_key="unique_run_key", # Replace with a unique identifier for the run
run_config={
# Define the run configuration here
},
)
Here's a complete example of a sensor that triggers a job run when a certain file exists:
python
from dagster import sensor, RunRequest
import os
@sensor(job=my_job)
def file_sensor(context):
filepath = "/path/to/my/file.txt"
if os.path.exists(filepath):
yield RunRequest(
run_key="file_exists_run", # Unique identifier for the run
run_config={
# Define the run configuration here
"ops": {
"my_op": {
"config": {
"filename": filepath
}
}
}
},
)
In this example, my_job
is the job that will be triggered by the sensor when the file at /path/to/my/file.txt
exists. The run_config
is a dictionary that specifies the configuration for the job, which in this case includes the filename to be processed by an operation within the job.
Remember to replace /path/to/my/file.txt
with the actual path to the file you want to monitor, and my_job
with the actual job you want to trigger.
Please note that the sensor will run at a default interval of 30 seconds unless yoScout
02/29/2024, 7:21 PMminimum_interval_seconds
parameter on the @sensor
decorator. You can adjust this interval based on your monitoring needs.