li liang
04/27/2023, 9:35 AMfrom dagster import (
sensor,
RunRequest,
SensorEvaluationContext,
ConfigurableResource,
job,
Definitions,
RunConfig,
)
import requests
from typing import List
class UsersAPI(ConfigurableResource):
url: str
def fetch_users(self) -> List[str]:
return requests.get(self.url).json()
@job
def process_user():
...
@sensor(job=process_user)
def process_new_users_sensor(
context: SensorEvaluationContext,
users_api: UsersAPI,
):
last_user = int(context.cursor) if context.cursor else 0
users = users_api.fetch_users()
num_users = len(users)
for user_id in users[last_user:]:
yield RunRequest(
run_key=user_id,
tags={"user_id": user_id},
)
context.update_cursor(str(num_users))
defs = Definitions(
jobs=[process_user],
sensors=[process_new_users_sensor],
resources={"users_api": UsersAPI(url="<https://my-api.com/users>")},
)
but if to add op with same resource,
from dagster import (
sensor,
RunRequest,
SensorEvaluationContext,
ConfigurableResource,
job,
op,
Definitions,
RunConfig,
)
import requests
from typing import List
class UsersAPI(ConfigurableResource):
url: str
def fetch_users(self) -> List[str]:
return ['1']
#return requests.get(self.url).json()
@op
def op1(
users_api: UsersAPI,
):
...
@job
def process_user():
op1()
@sensor(job=process_user)
def process_new_users_sensor(
context: SensorEvaluationContext,
users_api: UsersAPI,
):
last_user = int(context.cursor) if context.cursor else 0
users = users_api.fetch_users()
num_users = len(users)
for user_id in users[last_user:]:
yield RunRequest(
run_key=user_id,
tags={"user_id": user_id},
)
context.update_cursor(str(num_users))
defs = Definitions(
jobs=[process_user],
sensors=[process_new_users_sensor],
resources={
"users_api": UsersAPI(url="<https://my-api.com/users>"),
},
)
there will be an error:
dagster._core.errors.DagsterInvalidDefinitionError: Resource with key 'users_api' required by sensor 'process_new_users_sensor' was not provided.
File "\venv\dagster\lib\site-packages\dagster\_grpc\impl.py", line 375, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 659, in evaluate_tick
result = list(self._evaluation_fn(context))
File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 938, in _wrapped_fn
for item in result:
File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 927, in _wrapped_fn
resource_args_populated = validate_and_get_resource_dict(
File "\venv\dagster\lib\site-packages\dagster\_core\definitions\sensor_definition.py", line 387, in validate_and_get_resource_dict
raise DagsterInvalidDefinitionError(
ben
04/27/2023, 5:55 PMli liang
04/28/2023, 12:55 AMAbhishek Agrawal
05/01/2023, 12:08 AMyield RunRequest(
run_key=file_name,
run_config=RunConfig(ops={
"job_resource": DataSourceSensorConfig( customer_code_with_suffix=customer_code_with_suffix),
},
)
)
In the @job this sensor is supposed to trigger, I was earlier using
@job(
resource_defs={
"customer_code_with_suffix": make_values_resource()
}
)
def run_job():
run_op()
I have replaced this now with -
@job
def run_job():
run_op()
defs = Definitions(
jobs=[run_job],
resources={
"job_resource": DataSourceSensorConfig.configure_at_launch()
},
)
This is the DataSourceSensorConfig
class
class DataSourceSensorConfig(ConfigurableResource):
customer_code_with_suffix: str
However, this is giving an error -
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterInvalidConfigError: Error in config for job run_job
Error 1: Received unexpected config entry "job_resource" at path root:ops. Expected: "{ run_op: { config?: Any inputs: { job_resource: { json: { path: String } pickle: { path: String } value: Any } } } }".
Error 2: Missing required config entry "run_op" at path root:ops. Sample config for missing entry: {'run_op': {'inputs': {'job_resource': '<selector>'}}}
I am not entirely sure how to pass the config to the job so that it can be used by all the ops within the job.
Could you help?ben
05/02/2023, 10:49 PMRunRequest
? Just trying to figure out exactly how this should translateAbhishek Agrawal
05/02/2023, 11:06 PMrun_config
dict in def _sensor() like this -
@sensor(
job=job_to_run,
name="bucket_sensor"
)
def _sensor():
....
....
run_config = {
'resources': {
'customer_code_with_suffix': {
'config': customer_code_with_suffix
}
}
}
yield RunRequest(
run_key=file_name,
run_config=run_config
)
return _sensor
in my job code, I had -
@op(required_resource_keys={"customer_code_with_suffix"})
def op_run(context):
customer_code_with_suffix = context.resources.customer_code_with_suffix
....
....
return
@job(
resource_defs={
"customer_code_with_suffix": make_values_resource()
}
)
def job_run():
op_run()
resource_defs
dict and then passing it to the @op and consuming via context
in the code.
Does this help?owen
05/09/2023, 6:27 PMjob_resource
, you'll want something like
RunConfig(resources={"job_resource": DataSourceSensorConfig( customer_code_with_suffix=customer_code_with_suffix),
})
job_resource
argument to your op as an input, rather than a resource, which is likely happening because you don't have a type annotation there (it should look basically like this: https://docs.dagster.io/guides/dagster/migrating-to-pythonic-resources-and-config#step-3-using-the-resource-in-assets)