https://dagster.io/ logo
#ask-ai
Title
# ask-ai
j

James O'Toole

02/07/2024, 10:41 PM
@Scout how can i access and use resources within a sensor function?
s

Scout

02/07/2024, 10:41 PM
To access and use resources within a sensor function in Dagster, you can specify resource dependencies by annotating the resource as a parameter to the sensor's function. Resources are provided by attaching them to your
Definitions
call. Here's an example from the documentation that illustrates how to use resources in sensors:
Copy code
python @sensor(job=log_file_job) def my_directory_sensor_with_skip_reasons():     has_files = False     for filename in os.listdir(MY_DIRECTORY):         filepath = os.path.join(MY_DIRECTORY, filename)         if os.path.isfile(filepath):             yield RunRequest(                 run_key=filename,                 run_config={                     "ops": {"process_file": {"config": {"filename": filename}}}                 },             )             has_files = True     if not has_files:         yield SkipReason(f"No files found in {MY_DIRECTORY}.")
In the above example, if you wanted to use a resource within the sensor function, you would annotate the resource as a parameter to the function. Then, you would attach the resource to your
Definitions
call when setting up your repository. Here's a hypothetical example of how you might modify the sensor to include a resource:
Copy code
python @sensor(job=log_file_job) def my_directory_sensor_with_resources(context, my_resource):     # You can now use my_resource within your sensor logic     resource_value = my_resource.some_method()     ...
And when setting up your repository:
Copy code
python from dagster import Definitions  defs = Definitions(     jobs=[log_file_job],     sensors=[my_directory_sensor_with_resources],     resources={'my_resource': my_resource_definition}, )
In this setup,
my_resource_definition
would be the definition of the resource you want to use within your sensor. Remember to replace
log_file_job
,
my_resource
,
my_resource_definition
, and other placeholders with your actual job and resource names and definitions. For more detailed information and examples, you can refer to the [Sensors documentation](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#using-resources-in-sensors) on how to use resources in sensors.
j

James O'Toole

02/07/2024, 11:36 PM
@Scout what about resources that make use of values passed in via the job config? here is one such resource:
Copy code
@resource
def callback_url_resource(init_context) -> str:
    """
    This resource fetches the job completion callback URL from the resource configuration.
    If the URL is not present in the configuration, it returns an empty string.
    """
    if (
        init_context.resource_config
        and "job_completion_callback_url" in init_context.resource_config.keys()
    ):
        return init_context.resource_config["job_completion_callback_url"]
    return ""
i would like to use this resource in the following sensor:
Copy code
@run_failure_sensor(
    default_status=DefaultSensorStatus.RUNNING
)
def failure_callback(context: RunFailureSensorContext, job_completion_callback_url: Optional[str], job_completion_callback_auth: Optional[str]) -> None:
    """
    Sensor triggered when a processing job fails
    """
    _send_callback(
        context,
        "FAILED", 
        f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",
        job_completion_callback_url,
        job_completion_callback_auth
    )
s

Scout

02/07/2024, 11:36 PM
In Dagster, resources that rely on values passed via job config are typically initialized during job execution, where the job's run config provides the necessary configuration for the resource. However, sensors, including run failure sensors, are not directly associated with a specific job's run config. Instead, they are triggered by events such as job failures and do not have access to the job's run config at the time of their execution. To use a resource like
callback_url_resource
within a sensor, you would need to initialize the resource with the required configuration outside of the job's execution context. This can be done using the
build_resources
function, which allows you to initialize resources outside of the context of execution. Here's an example based on the documentation and GitHub issues:
Copy code
python from dagster import build_resources, run_failure_sensor, RunFailureSensorContext  @run_failure_sensor(     default_status=DefaultSensorStatus.RUNNING ) def failure_callback(context: RunFailureSensorContext) -> None:     """     Sensor triggered when a processing job fails     """     # Initialize the resources outside of the job's execution context     with build_resources(         {"callback_url": callback_url_resource}     ) as resources:         job_completion_callback_url = resources.callback_url         job_completion_callback_auth = resources.callback_auth  # Assuming you have a similar resource for auth      _send_callback(         context,         "FAILED",          f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",         job_completion_callback_url,         job_completion_callback_auth     )
In this example,
build_resources
is used to initialize the
callback_url_resource
and any other resources you might need. The
with
statement ensures that the resources are properly cleaned up after use. The initialized resources are then available within the block and can be used to retrieve the necessary configuration values. Please note that the
build_resources
function requires a dictionary of resource keys to resource definitions. In the example above,
callback_url_resource
is assumed to be a resource definition that you have defined elsewhere in your code. Keep in mind that the
build_resources
approach is a workaround for initializing resources outside of the job's execution context and may not be as straightforward as using resources within a job. Additionally, you should ensure that the resource configuration is available and correctly set up for the
build_resources
call to work as expected. For more information on using resources and the
build_resources
function, you can refer to the [Resources documentation](https://docs.dagster.io/_apidocs/resources#dagster.build_resources).
j

James O'Toole

02/08/2024, 12:12 AM
@Scout how can i then execute the callback_url_resource function such that i can get the return value within the sensor function?
s

Scout

02/08/2024, 12:12 AM
To execute the
callback_url_resource
function and get the return value within the sensor function, you would typically use the
build_resources
function to initialize the resource and then access its attributes or methods. However, since
callback_url_resource
is expected to return a string based on the provided configuration, you need to ensure that the configuration is available to the resource during initialization. Here's an example of how you might use
build_resources
to initialize the
callback_url_resource
and access its return value within a sensor function:
Copy code
python from dagster import build_resources, run_failure_sensor, RunFailureSensorContext, ResourceDefinition  # Assuming callback_url_resource is defined elsewhere in your code as a ResourceDefinition callback_url_resource_def = ResourceDefinition.hardcoded_resource("<http://example.com/callback>")  @run_failure_sensor(     default_status=DefaultSensorStatus.RUNNING ) def failure_callback(context: RunFailureSensorContext) -> None:     """     Sensor triggered when a processing job fails     """     # Initialize the resources outside of the job's execution context     with build_resources(         {"callback_url": callback_url_resource_def}     ) as resources:         # Access the return value of the callback_url_resource function         job_completion_callback_url = resources.callback_url      # Now you can use job_completion_callback_url in your sensor logic     _send_callback(         context,         "FAILED",          f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",         job_completion_callback_url     )
In this example,
ResourceDefinition.hardcoded_resource
is used to create a resource definition with a hardcoded value for the sake of illustration. In practice, you would replace this with the actual resource definition that takes the necessary configuration. The
with build_resources(...)
block initializes the resources, and within this block, you can access the initialized resource's return value or any methods it provides. After the block, you can use the retrieved value in your sensor logic. Please note that the actual implementation of your
callback_url_resource
may require specific configuration to be passed during initialization. Make sure to provide the necessary configuration when using
build_resources
to ensure that the resource is initialized correctly. For more information on resources and how to use them, you can refer to the [Resources documentation](https://docs.dagster.io/concepts/resources).
j

James O'Toole

02/08/2024, 12:17 AM
@Scout in my case callback_url_resource is a function that fetches a value from the job's init_context: @resource def *callback_url_resource*(_init_context_) -> str: """ This resource fetches the job completion callback URL from the resource configuration. If the URL is not present in the configuration, it returns an empty string. """ if ( _init_context_.resource_config and "job_completion_callback_url" in _init_context_.resource_config.keys() ): return _init_context_.resource_config["job_completion_callback_url"] return "" how should i proceed to access its return value?
s

Scout

02/08/2024, 12:17 AM
To access the return value of the
callback_url_resource
within a sensor function, you need to provide the necessary configuration when initializing the resource. Since the
callback_url_resource
depends on the
init_context
to fetch the configuration, you can use
build_init_resource_context
to construct the initialization context with the required configuration. Here's an example of how you might do this:
Copy code
python
from dagster import build_init_resource_context, run_failure_sensor, RunFailureSensorContext, resource

# Define your resource as you have it
@resource
def callback_url_resource(init_context) -> str:
    """
    This resource fetches the job completion callback URL from the resource configuration.
    If the URL is not present in the configuration, it returns an empty string.
    """
    if (
        init_context.resource_config
        and "job_completion_callback_url" in init_context.resource_config.keys()
    ):
        return init_context.resource_config["job_completion_callback_url"]
    return ""

# Define your sensor function
@run_failure_sensor(
    default_status=DefaultSensorStatus.RUNNING
)
def failure_callback(context: RunFailureSensorContext) -> None:
    """
    Sensor triggered when a processing job fails
    """
    # Construct the initialization context with the required configuration
    init_context = build_init_resource_context(
        resources={"callback_url": callback_url_resource},
        config={"job_completion_callback_url": "<http://example.com/callback>"}
    )
    
    # Initialize the resource and get the return value
    with init_context.resources.callback_url() as job_completion_callback_url:
        # Now you can use job_completion_callback_url in your sensor logic
        _send_callback(
            context,
            "FAILED", 
            f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",
            job_completion_callback_url
        )
In this example,
build_init_resource_context
is used to create an
InitResourceContext
with the necessary configuration for the
callback_url_resource
. The
config
parameter is a dictionary that should match the structure expected by the resource's configuration schema. In this case, we're providing a hardcoded URL for the sake of illustration. Once the context is built, you can initialize the resource and access its return value. The
with
statement ensures that any teardown logic defined in the resource is executed after the block. Please adjust the configuration dictionary to match the actual configuration structure your resource expects. If the configuration is dynamic and not known at sensor definition time, you'll need to find a way to provide the dynamic configuration at runtime, possibly by fetching it from a
n external source or using environment variables.
j

James O'Toole

02/08/2024, 12:20 AM
@Scout i don't think
config={"job_completion_callback_url": "<http://example.com/callback>"}
will work for me because i'm not hard-coding that value—it's comes in via the call to the dagster graphql api that triggers the job
s

Scout

02/08/2024, 12:20 AM
If the value for
job_completion_callback_url
comes from the job's run config, which is provided via a call to the Dagster GraphQL API, then you won't be able to access this value directly within a sensor or run failure sensor because these sensors are not tied to a specific job run and do not have access to the run config. However, you can access information about the failed run from the
RunFailureSensorContext
. This context includes the
DagsterRun
object, which contains tags that may have been set during the run. If you can modify the process that triggers the job to include the
job_completion_callback_url
as a tag, you could then access it within the run failure sensor. Here's an example of how you might set a tag when triggering a job via the GraphQL API:
Copy code
graphql mutation {   launchPipelineExecution(     executionParams: {       runConfigData: {         resources: {           ...         }         ...       }       mode: "default"       selector: {         pipelineName: "my_pipeline"         ...       }       executionMetadata: {         tags: [           { key: "job_completion_callback_url", value: "<http://example.com/callback>" }         ]       }     }   ) {     run {       runId       pipeline {         name       }     }   } }
And then in your run failure sensor, you could access this tag like so:
Copy code
python @run_failure_sensor def failure_callback(context: RunFailureSensorContext) -> None:     """     Sensor triggered when a processing job fails     """     # Access the tag from the context     job_completion_callback_url = context.dagster_run.tags.get("job_completion_callback_url")      # Now you can use job_completion_callback_url in your sensor logic     if job_completion_callback_url:         _send_callback(             context,             "FAILED",              f"{context.dagster_run.job_name} failed. Error: {context.failure_event.message}",             job_completion_callback_url         )
In this example,
context.dagster_run.tags.get("job_completion_callback_url")
is used to retrieve the
job_completion_callback_url
from the run's tags. This assumes that the job was triggered with the appropriate tag set. Please note that this approach requires that the process triggering the job includes the necessary information as a tag, and that the run failure sensor is designed to look for this tag.
j

James O'Toole

02/08/2024, 12:23 AM
@Scout how would i update the code below to set callback_url_resource as a tag? resources = { "job_completion_callback_url": callback_url_resource, "job_completion_callback_auth": callback_auth_resource, } all_assets = load_assets_from_modules([assets], _group_name_=GROUP_NAME) materialize_all_assets_job = define_asset_job( name="haystacker_videos_job", selection=AssetSelection.all(), # auto-terminate run after 20 minutes; we can toggle this up or down as needed tags={MAX_RUNTIME_SECONDS_TAG: 1200}, ) defs = Definitions( assets=all_assets, jobs=[materialize_all_assets_job], resources=resources, sensors=[success_callback, failure_callback] )
s

Scout

02/08/2024, 12:23 AM
To set
callback_url_resource
as a tag for the
materialize_all_assets_job
, you would need to ensure that the value you want to set as a tag is available at the time you define the job. Since resources are typically initialized at runtime with the run config, you cannot directly use a resource as a tag value at job definition time. However, if you have a known value for the
job_completion_callback_url
that you want to set as a tag, you can do so by adding it to the
_tags_
dictionary when defining the job. Here's an example of how you might do that:
Copy code
python from dagster import define_asset_job, AssetSelection, MAX_RUNTIME_SECONDS_TAG, Definitions, load_assets_from_modules  # Assuming callback_url_resource and callback_auth_resource are defined elsewhere # and GROUP_NAME is a constant you've defined  # Define your assets all_assets = load_assets_from_modules([assets], _group_name_=GROUP_NAME)  # Define your job with the necessary tags materialize_all_assets_job = define_asset_job(     _name_="haystacker_videos_job",     _selection_=AssetSelection.all(),     _tags_={         MAX_RUNTIME_SECONDS_TAG: 1200,         "job_completion_callback_url": "<http://example.com/callback>",  # Replace with your actual URL         "job_completion_callback_auth": "auth_token",  # Replace with your actual auth token     }, )  # Define your resources resources = {     "job_completion_callback_url": callback_url_resource,     "job_completion_callback_auth": callback_auth_resource, }  # Define your Definitions object defs = Definitions(     _assets_=all_assets,     _jobs_=[materialize_all_assets_job],     _resources_=resources,     _sensors_=[success_callback, failure_callback] )
In this example, the tags for
job_completion_callback_url
and
job_completion_callback_auth
are hardcoded. If these values are dynamic and not known until runtime, you would need to find a way to provide these values at runtime, such as by setting them in the run config when triggering the job or by using environment variables. If you need to fetch these values dynamically at runtime and cannot hardcode them, you would typically handle this within the operations (ops) that make up your job, where you can access resources and their configurations. Sensors and jobs do not have the ability to dynamically fetch and set tags based on resource configurations because resources are initialized with the run config, which is not available at job definition time.
j

James O'Toole

02/08/2024, 12:27 AM
@Scout i'll have these values in the run config when triggering the job
s

Scout

02/08/2024, 12:27 AM
If you have the values for
job_completion_callback_url
and any other dynamic values in the run config when triggering the job, you can access these values within the job's operations (ops) by using the context object provided to each op. However, you cannot directly use these run config values as tags at the job definition time because they are only available at runtime. To use these values as tags, you would need to extract them from the run config within an op and then, if necessary, pass them along to other ops or systems that require them. Here's an example of how you might access these values within an op:
Copy code
python from dagster import op, job, RunConfig  @op(config_schema={"job_completion_callback_url": str}) def my_op(context):     # Access the run config value     job_completion_callback_url = context.op_config["job_completion_callback_url"]     # Use the value as needed within the op     <http://context.log.info|context.log.info>(f"Callback URL: {job_completion_callback_url}")  @job def my_job():     my_op()  # Example of how you might execute the job with a run config result = my_job.execute_in_process(     run_config=RunConfig(         {"ops": {"my_op": {"config": {"job_completion_callback_url": "<http://example.com/callback>"}}}}     ) )
In this example,
my_op
is configured to accept a
job_completion_callback_url
from the run config. When you execute the job, you provide the run config with the necessary values. If you want to use these values as tags for the job run itself, you would need to set them when you trigger the job execution. This can be done via the GraphQL API, the Dagit UI, or the Dagster CLI, depending on how you are triggering the job. Here's an example of how you might set tags via the GraphQL API:
Copy code
graphql mutation {   launchPipelineExecution(     executionParams: {       runConfigData: {         resources: {           ...         }         ...       }       mode: "default"       selector: {         pipelineName: "my_pipeline"         ...       }       executionMetadata: {         tags: [           { key: "job_completion_callback_url", value: "<http://example.com/callback>" },           { key: "job_completion_callback_auth", value: "auth_token" }         ]       }     }   ) {     run {       runId       pipeline {         name       }     }   } }
In this GraphQL mutation, the
executionMetadata.tags
field is used to set the tags for the run, including the
job_completion_callback_url
and
job_completion_callback_auth
. These tags will then be associated with the run and can be accessed within any sensors or monitoring tools that you have set up to respond to job run events.
j

James O'Toole

02/08/2024, 12:30 AM
@Scout how do i pass in tags if submitting a job via the graphql python client?
s

Scout

02/08/2024, 12:30 AM
When submitting a job execution through the Dagster GraphQL Python client, you can pass tags as part of the
execute_pipeline
GraphQL mutation. The Python client provides a method to submit a job execution, and you can include tags as a parameter to this method. Here's an example of how you might use the
DagsterGraphQLClient
to submit a job execution with tags:
Copy code
python from dagster_graphql import DagsterGraphQLClient  # Instantiate the client with the appropriate URL and headers client = DagsterGraphQLClient("localhost", port_number=3000)  # Define your tags tags = {     "job_completion_callback_url": "<http://example.com/callback>",     "job_completion_callback_auth": "auth_token" }  # Convert tags to the format expected by the GraphQL API formatted_tags = [{"key": key, "value": value} for key, value in tags.items()]  # Submit the job execution with tags response = client.submit_pipeline_execution(     pipeline_name="my_pipeline",     run_config={...},  # Your run config here     tags=formatted_tags )  # Check the response if response.success:     run_id = response.data["launchPipelineExecution"]["run"]["runId"]     print(f"Job submitted successfully with run ID: {run_id}") else:     print("Job submission failed")     print(response.errors)
In this example,
submit_pipeline_execution
is the method used to submit a job execution. The
tags
parameter is provided as a list of dictionaries, each containing a "key" and a "value" that correspond to the tag's name and value, respectively. Please note that the actual method name and parameters might differ depending on the version of the Dagster GraphQL Python client you are using. Always refer to the specific documentation for your version to ensure you are using the correct method and parameter names.
j

James O'Toole

02/08/2024, 3:37 PM
@Scout how do i access tag values in a sensor function?
s

Scout

02/08/2024, 3:37 PM
To access tag values in a sensor function, you can use the
RunRequest
object that is yielded by the sensor function. The
RunRequest
object can include tags, which are then passed to the job run. However, sensors themselves do not have direct access to the tags of a job run because they are not tied to a specific job run instance. If you want to access the tags from a job run within a sensor, you would typically do so in the context of responding to an event, such as a run failure or success, using a run status sensor. The
RunFailureSensorContext
or
RunSuccessSensorContext
provided to these sensors includes a
DagsterRun
object, which contains the tags that were set for the run. Here's an example of how you might access tags within a run failure sensor:
Copy code
python from dagster import run_failure_sensor, RunFailureSensorContext  @run_failure_sensor def my_failure_sensor(context: RunFailureSensorContext):     # Access the tags from the failed run     tags = context.dagster_run.tags      # Access a specific tag value     job_completion_callback_url = tags.get("job_completion_callback_url")      # Use the tag value as needed     if job_completion_callback_url:         # Logic to handle the failure, possibly using the callback URL         pass
In this example,
context.dagster_run.tags
is a dictionary containing all the tags that were set for the run. You can use the
get
method to retrieve the value of a specific tag by its key. Please note that this approach is for run status sensors, which are triggered in response to job run events. If you need to access tags in a different type of sensor, you would need to ensure that the necessary information is available to the sensor through other means, such as by setting it in an external system or passing it through the sensor's context.
j

James O'Toole

02/08/2024, 8:14 PM
@Scout what type hint should i use with
tags = context.dagster_run.tags
, and in what form should i expect them?
s

Scout

02/08/2024, 8:14 PM
In Dagster, the
tags
attribute of a
DagsterRun
object is a dictionary where both the keys and values are strings. The type hint for the
tags
variable would be
Dict[str, str]
. This means you should expect the tags to be in the form of a dictionary with string keys and string values. Here's how you can add the type hint to the
tags
variable in the context of a run failure sensor:
Copy code
python from typing import Dict from dagster import run_failure_sensor, RunFailureSensorContext  @run_failure_sensor def my_failure_sensor(context: RunFailureSensorContext):     # Type hint for the tags variable     tags: Dict[str, str] = context.dagster_run.tags      # Access a specific tag value     job_completion_callback_url = tags.get("job_completion_callback_url")      # Use the tag value as needed     if job_completion_callback_url:         # Logic to handle the failure, possibly using the callback URL         pass
In this example,
tags
is annotated with the type
Dict[str, str]
, indicating that it is a dictionary with string keys and string values. When you access the tags using
context.dagster_run.tags
, you can expect them to be in this format.
j

James O'Toole

02/08/2024, 8:21 PM
@Scout if i'm triggering a job manually via the dagit UI, how do i set tag values?
s

Scout

02/08/2024, 8:21 PM
To set tag values when triggering a job manually via the Dagit UI, you can follow these steps: 1. Navigate to the Launchpad for the job you want to execute. This can typically be done by selecting the job from the Jobs list in Dagit and then clicking on the Launchpad tab. 2. In the Launchpad, you will see a section for Tags. Click on the + Add Tag button to add a new tag. 3. Enter the Key and Value for your tag. The key must be a string, and the value must also be a string or a JSON that is serializable to a string. 4. You can add multiple tags by repeating the process of clicking + Add Tag and entering the key-value pairs for each tag. 5. Once you have added all the desired tags, you can proceed to configure the rest of the run, such as the run config and any other settings. 6. When you are ready to trigger the job, click the Launch Run button at the bottom of the Launchpad. The tags you set will be attached to the run, and you can view them later in the Runs page in the UI. These tags will also be accessible within any sensors or monitoring tools that you have set up to respond to job run events. Remember that tags are a way to attach arbitrary information to a run, and some features of Dagster can be controlled using the tags attached to a run.
j

James O'Toole

02/08/2024, 8:50 PM
@Scout if i'm writing unit tests that use
build_sensor_context
, how do i set mock values for
run_id
and
tags
? i'm accessing these values in my sensors via
_context_.dagster_run.run_id
and
_context_.dagster_run.tags
s

Scout

02/08/2024, 8:50 PM
When writing unit tests for sensors in Dagster and using
build_sensor_context
to create a mock sensor context, you can set mock values for
run_id
and
tags
by passing them as arguments to
build_sensor_context
. However, as of my knowledge cutoff date in early 2023,
build_sensor_context
does not directly accept
run_id
or
tags
as parameters. To work around this limitation, you can create a
DagsterRun
object with the desired
run_id
and
tags
, and then use the
instance
and
run
parameters of
build_sensor_context
to associate this
DagsterRun
object with the sensor context. Here's an example of how you might do this:
Copy code
python from dagster import build_sensor_context, DagsterRun, DagsterInstance  # Mock values for run_id and tags mock_run_id = "mock_run_id" mock_tags = {"key1": "value1", "key2": "value2"}  # Create a mock DagsterRun object with the mock run_id and tags mock_dagster_run = DagsterRun(run_id=mock_run_id, tags=mock_tags)  # Create an ephemeral DagsterInstance instance = DagsterInstance.ephemeral()  # Use build_sensor_context with the instance and run parameters context = build_sensor_context(     instance=instance,     run=mock_dagster_run )  # Now you can use context.dagster_run.run_id and context.dagster_run.tags in your tests assert context.dagster_run.run_id == mock_run_id assert context.dagster_run.tags == mock_tags
In this example,
DagsterRun
is used to create a mock run object with the specified
run_id
and
tags
. This mock run object is then passed to
build_sensor_context
along with an ephemeral instance of
DagsterInstance
. The resulting context object will have the
dagster_run
attribute set to the mock run object, allowing you to access
run_id
and
tags
in your tests as you would in a real sensor execution. Please note that the actual implementation details may vary based on the version of Dagster you are using, and the APIs may have changed after my knowledge cutoff date. Always refer to the latest Dagster documentation for the most accurate and up-to-date information.
j

James O'Toole

02/08/2024, 8:58 PM
@Scout it looks like
build_sensor_context
doesn't take
run
as an argument. could you adjust your code snippet?
s

Scout

02/08/2024, 8:58 PM
You are correct that
build_sensor_context
does not take a
run
argument directly. My apologies for the confusion. Instead, you can set the
run_id
and
tags
by creating a subclass of
SensorExecutionContext
and overriding the
dagster_run
property to return a
DagsterRun
with the desired values. Here's an adjusted example of how you might do this:
Copy code
python from dagster import build_sensor_context, DagsterRun, SensorExecutionContext from unittest.mock import PropertyMock, patch  # Mock values for run_id and tags mock_run_id = "mock_run_id" mock_tags = {"key1": "value1", "key2": "value2"}  # Create a mock DagsterRun object with the mock run_id and tags mock_dagster_run = DagsterRun(run_id=mock_run_id, tags=mock_tags)  # Use build_sensor_context to create a sensor context context = build_sensor_context()  # Patch the 'dagster_run' attribute of the sensor context to return the mock DagsterRun with patch.object(SensorExecutionContext, 'dagster_run', new_callable=PropertyMock) as mock_run:     mock_run.return_value = mock_dagster_run     # Now you can use context.dagster_run.run_id and context.dagster_run.tags in your tests     assert context.dagster_run.run_id == mock_run_id     assert context.dagster_run.tags == mock_tags     # Here you would invoke your sensor function using the patched context     # my_sensor(context)
In this example,
unittest.mock.patch.object
is used to patch the
dagster_run
attribute of the sensor context to return the mock
DagsterRun
object. This allows you to simulate having a
run_id
and
tags
in the sensor context during testing. Please note that this is a simplified example for illustrative purposes. Depending on the complexity of your sensor and the specifics of your testing framework, you may need to adjust this pattern to fit your needs. Always refer to the latest documentation and best practices for mocking and testing in Python.
h

Henry Tareque

02/09/2024, 6:43 PM
@Scout Is there a way for me to trigger a dagster run from an external service?
s

Scout

02/09/2024, 6:43 PM
There was an error processing your request. Please try again.
h

Henry Tareque

02/09/2024, 6:44 PM
@Scout Is there a way for me to trigger a dagster run from an external service?
s

Scout

02/09/2024, 6:44 PM
There was an error processing your request. Please try again.
11 Views