https://dagster.io/ logo
#ask-community
Title
# ask-community
l

li liang

04/27/2023, 9:35 AM
Hi team! I meet a problem. the sensor with resource in docs_snippets can work:
Copy code
from dagster import (
    sensor,
    RunRequest,
    SensorEvaluationContext,
    ConfigurableResource,
    job,
    Definitions,
    RunConfig,
)
import requests
from typing import List

class UsersAPI(ConfigurableResource):
    url: str

    def fetch_users(self) -> List[str]:
        return requests.get(self.url).json()

@job
def process_user():
    ...

@sensor(job=process_user)
def process_new_users_sensor(
    context: SensorEvaluationContext,
    users_api: UsersAPI,
):
    last_user = int(context.cursor) if context.cursor else 0
    users = users_api.fetch_users()

    num_users = len(users)
    for user_id in users[last_user:]:
        yield RunRequest(
            run_key=user_id,
            tags={"user_id": user_id},
        )

    context.update_cursor(str(num_users))

defs = Definitions(
    jobs=[process_user],
    sensors=[process_new_users_sensor],
    resources={"users_api": UsersAPI(url="<https://my-api.com/users>")},
)
but if to add op with same resource,
Copy code
from dagster import (
    sensor,
    RunRequest,
    SensorEvaluationContext,
    ConfigurableResource,
    job,
    op,
    Definitions,
    RunConfig,
)
import requests
from typing import List

class UsersAPI(ConfigurableResource):
    url: str

    def fetch_users(self) -> List[str]:
        return ['1']
        #return requests.get(self.url).json()

@op
def op1(
    users_api: UsersAPI,
):
    ...

@job
def process_user():
    op1()

@sensor(job=process_user)
def process_new_users_sensor(
    context: SensorEvaluationContext,
    users_api: UsersAPI,
):
    last_user = int(context.cursor) if context.cursor else 0
    users = users_api.fetch_users()

    num_users = len(users)
    for user_id in users[last_user:]:
        yield RunRequest(
            run_key=user_id,
            tags={"user_id": user_id},
        )

    context.update_cursor(str(num_users))

defs = Definitions(
    jobs=[process_user],
    sensors=[process_new_users_sensor],
    resources={
        "users_api": UsersAPI(url="<https://my-api.com/users>"),
        },
)
there will be an error:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: Resource with key 'users_api' required by sensor 'process_new_users_sensor' was not provided.
  File "\venv\dagster\lib\site-packages\dagster\_grpc\impl.py", line 375, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 659, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 938, in _wrapped_fn
    for item in result:
  File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 927, in _wrapped_fn
    resource_args_populated = validate_and_get_resource_dict(
  File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 387, in validate_and_get_resource_dict
    raise DagsterInvalidDefinitionError(
b

ben

04/27/2023, 5:55 PM
thanks for the report - I can reproduce this error. a fix is up which should go out with our next release
l

li liang

04/28/2023, 12:55 AM
Will this be fixed in 1.3.2 release?
a

Abhishek Agrawal

05/01/2023, 12:08 AM
Hey @ben, I am facing a similar issue but am not sure if it's a bug or an error in my code. I have a sensor where am passing run_config like this. Earlier it was a JSON but now it's Pythonic -
Copy code
yield RunRequest(
    run_key=file_name,
    run_config=RunConfig(ops={
        "job_resource": DataSourceSensorConfig(                  customer_code_with_suffix=customer_code_with_suffix),
    },
    )
)
In the @job this sensor is supposed to trigger, I was earlier using
Copy code
@job(
    resource_defs={
        "customer_code_with_suffix": make_values_resource()
    }
)
def run_job():
    run_op()
I have replaced this now with -
Copy code
@job
def run_job():
    run_op()


defs = Definitions(
    jobs=[run_job],
    resources={
        "job_resource": DataSourceSensorConfig.configure_at_launch()
    },
)
This is the
DataSourceSensorConfig
class
Copy code
class DataSourceSensorConfig(ConfigurableResource):
    customer_code_with_suffix: str
However, this is giving an error -
Copy code
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterInvalidConfigError: Error in config for job run_job
  Error 1: Received unexpected config entry "job_resource" at path root:ops. Expected: "{ run_op: { config?: Any inputs: { job_resource: { json: { path: String } pickle: { path: String } value: Any } } } }".
  Error 2: Missing required config entry "run_op" at path root:ops. Sample config for missing entry: {'run_op': {'inputs': {'job_resource': '<selector>'}}}
I am not entirely sure how to pass the config to the job so that it can be used by all the ops within the job. Could you help?
b

ben

05/02/2023, 10:49 PM
Hi Abishek, would you be able to share the code snippet that you were using before, including the
RunRequest
? Just trying to figure out exactly how this should translate
@li liang this will be out with this week’s 1.3.3 release
a

Abhishek Agrawal

05/02/2023, 11:06 PM
Thanks for your reply @ben! I was preparing a
run_config
dict in def _sensor() like this -
Copy code
@sensor(
    job=job_to_run,
    name="bucket_sensor"
)
def _sensor():
    ....
    ....
    run_config = {
                    'resources': {
                        'customer_code_with_suffix': {
                            'config': customer_code_with_suffix
                        }
                    }
                }
    yield RunRequest(
                    run_key=file_name,
                    run_config=run_config
                    )
    return _sensor
in my job code, I had -
Copy code
@op(required_resource_keys={"customer_code_with_suffix"})
def op_run(context):
    customer_code_with_suffix = context.resources.customer_code_with_suffix
    ....
    ....
    return

@job(
    resource_defs={
        "customer_code_with_suffix": make_values_resource()
    }
)
def job_run():
    op_run()
When I had to pass more values, it worked by adding more fields in the dict in the @sensor code in the same format and use them in the @job by first adding it in the
resource_defs
dict and then passing it to the @op and consuming via
context
in the code. Does this help?
Hey @ben , did you get a chance to think about this?
@owen - tagging you as it seems Ben is on leave, guessing from his status. Could you help?
o

owen

05/09/2023, 6:27 PM
hi @Abhishek Agrawal -- looks like two things are going on here. first, to configure
job_resource
, you'll want something like
Copy code
RunConfig(resources={"job_resource": DataSourceSensorConfig(                  customer_code_with_suffix=customer_code_with_suffix),
    })
second, it looks like dagster is interpreting your
job_resource
argument to your op as an input, rather than a resource, which is likely happening because you don't have a type annotation there (it should look basically like this: https://docs.dagster.io/guides/dagster/migrating-to-pythonic-resources-and-config#step-3-using-the-resource-in-assets)