James O'Toole
02/07/2024, 10:41 PMScout
02/07/2024, 10:41 PMDefinitions
call. Here's an example from the documentation that illustrates how to use resources in sensors: python @sensor(job=log_file_job) def my_directory_sensor_with_skip_reasons(): has_files = False for filename in os.listdir(MY_DIRECTORY): filepath = os.path.join(MY_DIRECTORY, filename) if os.path.isfile(filepath): yield RunRequest( run_key=filename, run_config={ "ops": {"process_file": {"config": {"filename": filename}}} }, ) has_files = True if not has_files: yield SkipReason(f"No files found in {MY_DIRECTORY}.")
In the above example, if you wanted to use a resource within the sensor function, you would annotate the resource as a parameter to the function. Then, you would attach the resource to your Definitions
call when setting up your repository. Here's a hypothetical example of how you might modify the sensor to include a resource: python @sensor(job=log_file_job) def my_directory_sensor_with_resources(context, my_resource): # You can now use my_resource within your sensor logic resource_value = my_resource.some_method() ...
And when setting up your repository: python from dagster import Definitions defs = Definitions( jobs=[log_file_job], sensors=[my_directory_sensor_with_resources], resources={'my_resource': my_resource_definition}, )
In this setup, my_resource_definition
would be the definition of the resource you want to use within your sensor. Remember to replace log_file_job
, my_resource
, my_resource_definition
, and other placeholders with your actual job and resource names and definitions. For more detailed information and examples, you can refer to the [Sensors documentation](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#using-resources-in-sensors) on how to use resources in sensors.James O'Toole
02/07/2024, 11:36 PM@resource
def callback_url_resource(init_context) -> str:
"""
This resource fetches the job completion callback URL from the resource configuration.
If the URL is not present in the configuration, it returns an empty string.
"""
if (
init_context.resource_config
and "job_completion_callback_url" in init_context.resource_config.keys()
):
return init_context.resource_config["job_completion_callback_url"]
return ""
i would like to use this resource in the following sensor:
@run_failure_sensor(
default_status=DefaultSensorStatus.RUNNING
)
def failure_callback(context: RunFailureSensorContext, job_completion_callback_url: Optional[str], job_completion_callback_auth: Optional[str]) -> None:
"""
Sensor triggered when a processing job fails
"""
_send_callback(
context,
"FAILED",
f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",
job_completion_callback_url,
job_completion_callback_auth
)
Scout
02/07/2024, 11:36 PMcallback_url_resource
within a sensor, you would need to initialize the resource with the required configuration outside of the job's execution context. This can be done using the build_resources
function, which allows you to initialize resources outside of the context of execution. Here's an example based on the documentation and GitHub issues: python from dagster import build_resources, run_failure_sensor, RunFailureSensorContext @run_failure_sensor( default_status=DefaultSensorStatus.RUNNING ) def failure_callback(context: RunFailureSensorContext) -> None: """ Sensor triggered when a processing job fails """ # Initialize the resources outside of the job's execution context with build_resources( {"callback_url": callback_url_resource} ) as resources: job_completion_callback_url = resources.callback_url job_completion_callback_auth = resources.callback_auth # Assuming you have a similar resource for auth _send_callback( context, "FAILED", f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}", job_completion_callback_url, job_completion_callback_auth )
In this example, build_resources
is used to initialize the callback_url_resource
and any other resources you might need. The with
statement ensures that the resources are properly cleaned up after use. The initialized resources are then available within the block and can be used to retrieve the necessary configuration values. Please note that the build_resources
function requires a dictionary of resource keys to resource definitions. In the example above, callback_url_resource
is assumed to be a resource definition that you have defined elsewhere in your code. Keep in mind that the build_resources
approach is a workaround for initializing resources outside of the job's execution context and may not be as straightforward as using resources within a job. Additionally, you should ensure that the resource configuration is available and correctly set up for the build_resources
call to work as expected. For more information on using resources and the build_resources
function, you can refer to the [Resources documentation](https://docs.dagster.io/_apidocs/resources#dagster.build_resources).James O'Toole
02/08/2024, 12:12 AMScout
02/08/2024, 12:12 AMcallback_url_resource
function and get the return value within the sensor function, you would typically use the build_resources
function to initialize the resource and then access its attributes or methods. However, since callback_url_resource
is expected to return a string based on the provided configuration, you need to ensure that the configuration is available to the resource during initialization. Here's an example of how you might use build_resources
to initialize the callback_url_resource
and access its return value within a sensor function: python from dagster import build_resources, run_failure_sensor, RunFailureSensorContext, ResourceDefinition # Assuming callback_url_resource is defined elsewhere in your code as a ResourceDefinition callback_url_resource_def = ResourceDefinition.hardcoded_resource("<http://example.com/callback>") @run_failure_sensor( default_status=DefaultSensorStatus.RUNNING ) def failure_callback(context: RunFailureSensorContext) -> None: """ Sensor triggered when a processing job fails """ # Initialize the resources outside of the job's execution context with build_resources( {"callback_url": callback_url_resource_def} ) as resources: # Access the return value of the callback_url_resource function job_completion_callback_url = resources.callback_url # Now you can use job_completion_callback_url in your sensor logic _send_callback( context, "FAILED", f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}", job_completion_callback_url )
In this example, ResourceDefinition.hardcoded_resource
is used to create a resource definition with a hardcoded value for the sake of illustration. In practice, you would replace this with the actual resource definition that takes the necessary configuration. The with build_resources(...)
block initializes the resources, and within this block, you can access the initialized resource's return value or any methods it provides. After the block, you can use the retrieved value in your sensor logic. Please note that the actual implementation of your callback_url_resource
may require specific configuration to be passed during initialization. Make sure to provide the necessary configuration when using build_resources
to ensure that the resource is initialized correctly. For more information on resources and how to use them, you can refer to the [Resources documentation](https://docs.dagster.io/concepts/resources).James O'Toole
02/08/2024, 12:17 AMScout
02/08/2024, 12:17 AMcallback_url_resource
within a sensor function, you need to provide the necessary configuration when initializing the resource. Since the callback_url_resource
depends on the init_context
to fetch the configuration, you can use build_init_resource_context
to construct the initialization context with the required configuration.
Here's an example of how you might do this:
python
from dagster import build_init_resource_context, run_failure_sensor, RunFailureSensorContext, resource
# Define your resource as you have it
@resource
def callback_url_resource(init_context) -> str:
"""
This resource fetches the job completion callback URL from the resource configuration.
If the URL is not present in the configuration, it returns an empty string.
"""
if (
init_context.resource_config
and "job_completion_callback_url" in init_context.resource_config.keys()
):
return init_context.resource_config["job_completion_callback_url"]
return ""
# Define your sensor function
@run_failure_sensor(
default_status=DefaultSensorStatus.RUNNING
)
def failure_callback(context: RunFailureSensorContext) -> None:
"""
Sensor triggered when a processing job fails
"""
# Construct the initialization context with the required configuration
init_context = build_init_resource_context(
resources={"callback_url": callback_url_resource},
config={"job_completion_callback_url": "<http://example.com/callback>"}
)
# Initialize the resource and get the return value
with init_context.resources.callback_url() as job_completion_callback_url:
# Now you can use job_completion_callback_url in your sensor logic
_send_callback(
context,
"FAILED",
f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",
job_completion_callback_url
)
In this example, build_init_resource_context
is used to create an InitResourceContext
with the necessary configuration for the callback_url_resource
. The config
parameter is a dictionary that should match the structure expected by the resource's configuration schema. In this case, we're providing a hardcoded URL for the sake of illustration.
Once the context is built, you can initialize the resource and access its return value. The with
statement ensures that any teardown logic defined in the resource is executed after the block.
Please adjust the configuration dictionary to match the actual configuration structure your resource expects. If the configuration is dynamic and not known at sensor definition time, you'll need to find a way to provide the dynamic configuration at runtime, possibly by fetching it from aScout
02/08/2024, 12:18 AMJames O'Toole
02/08/2024, 12:20 AMconfig={"job_completion_callback_url": "<http://example.com/callback>"}
will work for me because i'm not hard-coding that value—it's comes in via the call to the dagster graphql api that triggers the jobScout
02/08/2024, 12:20 AMjob_completion_callback_url
comes from the job's run config, which is provided via a call to the Dagster GraphQL API, then you won't be able to access this value directly within a sensor or run failure sensor because these sensors are not tied to a specific job run and do not have access to the run config. However, you can access information about the failed run from the RunFailureSensorContext
. This context includes the DagsterRun
object, which contains tags that may have been set during the run. If you can modify the process that triggers the job to include the job_completion_callback_url
as a tag, you could then access it within the run failure sensor. Here's an example of how you might set a tag when triggering a job via the GraphQL API: graphql mutation { launchPipelineExecution( executionParams: { runConfigData: { resources: { ... } ... } mode: "default" selector: { pipelineName: "my_pipeline" ... } executionMetadata: { tags: [ { key: "job_completion_callback_url", value: "<http://example.com/callback>" } ] } } ) { run { runId pipeline { name } } } }
And then in your run failure sensor, you could access this tag like so: python @run_failure_sensor def failure_callback(context: RunFailureSensorContext) -> None: """ Sensor triggered when a processing job fails """ # Access the tag from the context job_completion_callback_url = context.dagster_run.tags.get("job_completion_callback_url") # Now you can use job_completion_callback_url in your sensor logic if job_completion_callback_url: _send_callback( context, "FAILED", f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}", job_completion_callback_url )
In this example, context.dagster_run.tags.get("job_completion_callback_url")
is used to retrieve the job_completion_callback_url
from the run's tags. This assumes that the job was triggered with the appropriate tag set. Please note that this approach requires that the process triggering the job includes the necessary information as a tag, and that the run failure sensor is designed to look for this tag.James O'Toole
02/08/2024, 12:23 AMScout
02/08/2024, 12:23 AMcallback_url_resource
as a tag for the materialize_all_assets_job
, you would need to ensure that the value you want to set as a tag is available at the time you define the job. Since resources are typically initialized at runtime with the run config, you cannot directly use a resource as a tag value at job definition time. However, if you have a known value for the job_completion_callback_url
that you want to set as a tag, you can do so by adding it to the _tags_
dictionary when defining the job. Here's an example of how you might do that: python from dagster import define_asset_job, AssetSelection, MAX_RUNTIME_SECONDS_TAG, Definitions, load_assets_from_modules # Assuming callback_url_resource and callback_auth_resource are defined elsewhere # and GROUP_NAME is a constant you've defined # Define your assets all_assets = load_assets_from_modules([assets], _group_name_=GROUP_NAME) # Define your job with the necessary tags materialize_all_assets_job = define_asset_job( _name_="haystacker_videos_job", _selection_=AssetSelection.all(), _tags_={ MAX_RUNTIME_SECONDS_TAG: 1200, "job_completion_callback_url": "<http://example.com/callback>", # Replace with your actual URL "job_completion_callback_auth": "auth_token", # Replace with your actual auth token }, ) # Define your resources resources = { "job_completion_callback_url": callback_url_resource, "job_completion_callback_auth": callback_auth_resource, } # Define your Definitions object defs = Definitions( _assets_=all_assets, _jobs_=[materialize_all_assets_job], _resources_=resources, _sensors_=[success_callback, failure_callback] )
In this example, the tags for job_completion_callback_url
and job_completion_callback_auth
are hardcoded. If these values are dynamic and not known until runtime, you would need to find a way to provide these values at runtime, such as by setting them in the run config when triggering the job or by using environment variables. If you need to fetch these values dynamically at runtime and cannot hardcode them, you would typically handle this within the operations (ops) that make up your job, where you can access resources and their configurations. Sensors and jobs do not have the ability to dynamically fetch and set tags based on resource configurations because resources are initialized with the run config, which is not available at job definition time.James O'Toole
02/08/2024, 12:27 AMScout
02/08/2024, 12:27 AMjob_completion_callback_url
and any other dynamic values in the run config when triggering the job, you can access these values within the job's operations (ops) by using the context object provided to each op. However, you cannot directly use these run config values as tags at the job definition time because they are only available at runtime. To use these values as tags, you would need to extract them from the run config within an op and then, if necessary, pass them along to other ops or systems that require them. Here's an example of how you might access these values within an op: python from dagster import op, job, RunConfig @op(config_schema={"job_completion_callback_url": str}) def my_op(context): # Access the run config value job_completion_callback_url = context.op_config["job_completion_callback_url"] # Use the value as needed within the op <http://context.log.info|context.log.info>(f"Callback URL: {job_completion_callback_url}") @job def my_job(): my_op() # Example of how you might execute the job with a run config result = my_job.execute_in_process( run_config=RunConfig( {"ops": {"my_op": {"config": {"job_completion_callback_url": "<http://example.com/callback>"}}}} ) )
In this example, my_op
is configured to accept a job_completion_callback_url
from the run config. When you execute the job, you provide the run config with the necessary values. If you want to use these values as tags for the job run itself, you would need to set them when you trigger the job execution. This can be done via the GraphQL API, the Dagit UI, or the Dagster CLI, depending on how you are triggering the job. Here's an example of how you might set tags via the GraphQL API: graphql mutation { launchPipelineExecution( executionParams: { runConfigData: { resources: { ... } ... } mode: "default" selector: { pipelineName: "my_pipeline" ... } executionMetadata: { tags: [ { key: "job_completion_callback_url", value: "<http://example.com/callback>" }, { key: "job_completion_callback_auth", value: "auth_token" } ] } } ) { run { runId pipeline { name } } } }
In this GraphQL mutation, the executionMetadata.tags
field is used to set the tags for the run, including the job_completion_callback_url
and job_completion_callback_auth
. These tags will then be associated with the run and can be accessed within any sensors or monitoring tools that you have set up to respond to job run events.James O'Toole
02/08/2024, 12:30 AMScout
02/08/2024, 12:30 AMexecute_pipeline
GraphQL mutation. The Python client provides a method to submit a job execution, and you can include tags as a parameter to this method. Here's an example of how you might use the DagsterGraphQLClient
to submit a job execution with tags: python from dagster_graphql import DagsterGraphQLClient # Instantiate the client with the appropriate URL and headers client = DagsterGraphQLClient("localhost", port_number=3000) # Define your tags tags = { "job_completion_callback_url": "<http://example.com/callback>", "job_completion_callback_auth": "auth_token" } # Convert tags to the format expected by the GraphQL API formatted_tags = [{"key": key, "value": value} for key, value in tags.items()] # Submit the job execution with tags response = client.submit_pipeline_execution( pipeline_name="my_pipeline", run_config={...}, # Your run config here tags=formatted_tags ) # Check the response if response.success: run_id = response.data["launchPipelineExecution"]["run"]["runId"] print(f"Job submitted successfully with run ID: {run_id}") else: print("Job submission failed") print(response.errors)
In this example, submit_pipeline_execution
is the method used to submit a job execution. The tags
parameter is provided as a list of dictionaries, each containing a "key" and a "value" that correspond to the tag's name and value, respectively. Please note that the actual method name and parameters might differ depending on the version of the Dagster GraphQL Python client you are using. Always refer to the specific documentation for your version to ensure you are using the correct method and parameter names.James O'Toole
02/08/2024, 3:37 PMScout
02/08/2024, 3:37 PMRunRequest
object that is yielded by the sensor function. The RunRequest
object can include tags, which are then passed to the job run. However, sensors themselves do not have direct access to the tags of a job run because they are not tied to a specific job run instance. If you want to access the tags from a job run within a sensor, you would typically do so in the context of responding to an event, such as a run failure or success, using a run status sensor. The RunFailureSensorContext
or RunSuccessSensorContext
provided to these sensors includes a DagsterRun
object, which contains the tags that were set for the run. Here's an example of how you might access tags within a run failure sensor: python from dagster import run_failure_sensor, RunFailureSensorContext @run_failure_sensor def my_failure_sensor(context: RunFailureSensorContext): # Access the tags from the failed run tags = context.dagster_run.tags # Access a specific tag value job_completion_callback_url = tags.get("job_completion_callback_url") # Use the tag value as needed if job_completion_callback_url: # Logic to handle the failure, possibly using the callback URL pass
In this example, context.dagster_run.tags
is a dictionary containing all the tags that were set for the run. You can use the get
method to retrieve the value of a specific tag by its key. Please note that this approach is for run status sensors, which are triggered in response to job run events. If you need to access tags in a different type of sensor, you would need to ensure that the necessary information is available to the sensor through other means, such as by setting it in an external system or passing it through the sensor's context.James O'Toole
02/08/2024, 8:14 PMtags = context.dagster_run.tags
, and in what form should i expect them?Scout
02/08/2024, 8:14 PMtags
attribute of a DagsterRun
object is a dictionary where both the keys and values are strings. The type hint for the tags
variable would be Dict[str, str]
. This means you should expect the tags to be in the form of a dictionary with string keys and string values. Here's how you can add the type hint to the tags
variable in the context of a run failure sensor: python from typing import Dict from dagster import run_failure_sensor, RunFailureSensorContext @run_failure_sensor def my_failure_sensor(context: RunFailureSensorContext): # Type hint for the tags variable tags: Dict[str, str] = context.dagster_run.tags # Access a specific tag value job_completion_callback_url = tags.get("job_completion_callback_url") # Use the tag value as needed if job_completion_callback_url: # Logic to handle the failure, possibly using the callback URL pass
In this example, tags
is annotated with the type Dict[str, str]
, indicating that it is a dictionary with string keys and string values. When you access the tags using context.dagster_run.tags
, you can expect them to be in this format.James O'Toole
02/08/2024, 8:21 PMScout
02/08/2024, 8:21 PMJames O'Toole
02/08/2024, 8:50 PMbuild_sensor_context
, how do i set mock values for run_id
and tags
? i'm accessing these values in my sensors via _context_.dagster_run.run_id
and _context_.dagster_run.tags
Scout
02/08/2024, 8:50 PMbuild_sensor_context
to create a mock sensor context, you can set mock values for run_id
and tags
by passing them as arguments to build_sensor_context
. However, as of my knowledge cutoff date in early 2023, build_sensor_context
does not directly accept run_id
or tags
as parameters. To work around this limitation, you can create a DagsterRun
object with the desired run_id
and tags
, and then use the instance
and run
parameters of build_sensor_context
to associate this DagsterRun
object with the sensor context. Here's an example of how you might do this: python from dagster import build_sensor_context, DagsterRun, DagsterInstance # Mock values for run_id and tags mock_run_id = "mock_run_id" mock_tags = {"key1": "value1", "key2": "value2"} # Create a mock DagsterRun object with the mock run_id and tags mock_dagster_run = DagsterRun(run_id=mock_run_id, tags=mock_tags) # Create an ephemeral DagsterInstance instance = DagsterInstance.ephemeral() # Use build_sensor_context with the instance and run parameters context = build_sensor_context( instance=instance, run=mock_dagster_run ) # Now you can use context.dagster_run.run_id and context.dagster_run.tags in your tests assert context.dagster_run.run_id == mock_run_id assert context.dagster_run.tags == mock_tags
In this example, DagsterRun
is used to create a mock run object with the specified run_id
and tags
. This mock run object is then passed to build_sensor_context
along with an ephemeral instance of DagsterInstance
. The resulting context object will have the dagster_run
attribute set to the mock run object, allowing you to access run_id
and tags
in your tests as you would in a real sensor execution. Please note that the actual implementation details may vary based on the version of Dagster you are using, and the APIs may have changed after my knowledge cutoff date. Always refer to the latest Dagster documentation for the most accurate and up-to-date information.James O'Toole
02/08/2024, 8:58 PMbuild_sensor_context
doesn't take run
as an argument. could you adjust your code snippet?Scout
02/08/2024, 8:58 PMbuild_sensor_context
does not take a run
argument directly. My apologies for the confusion. Instead, you can set the run_id
and tags
by creating a subclass of SensorExecutionContext
and overriding the dagster_run
property to return a DagsterRun
with the desired values. Here's an adjusted example of how you might do this: python from dagster import build_sensor_context, DagsterRun, SensorExecutionContext from unittest.mock import PropertyMock, patch # Mock values for run_id and tags mock_run_id = "mock_run_id" mock_tags = {"key1": "value1", "key2": "value2"} # Create a mock DagsterRun object with the mock run_id and tags mock_dagster_run = DagsterRun(run_id=mock_run_id, tags=mock_tags) # Use build_sensor_context to create a sensor context context = build_sensor_context() # Patch the 'dagster_run' attribute of the sensor context to return the mock DagsterRun with patch.object(SensorExecutionContext, 'dagster_run', new_callable=PropertyMock) as mock_run: mock_run.return_value = mock_dagster_run # Now you can use context.dagster_run.run_id and context.dagster_run.tags in your tests assert context.dagster_run.run_id == mock_run_id assert context.dagster_run.tags == mock_tags # Here you would invoke your sensor function using the patched context # my_sensor(context)
In this example, unittest.mock.patch.object
is used to patch the dagster_run
attribute of the sensor context to return the mock DagsterRun
object. This allows you to simulate having a run_id
and tags
in the sensor context during testing. Please note that this is a simplified example for illustrative purposes. Depending on the complexity of your sensor and the specifics of your testing framework, you may need to adjust this pattern to fit your needs. Always refer to the latest documentation and best practices for mocking and testing in Python.Henry Tareque
02/09/2024, 6:43 PMScout
02/09/2024, 6:43 PMHenry Tareque
02/09/2024, 6:44 PMScout
02/09/2024, 6:44 PM