David
07/13/2022, 12:02 AM0.15.2
to 0.15.5
our @run_status_sensor
annotated method is failing with dagster._check.CheckError: Member of list mismatches type
(details in thread)2022-07-12 17:00:09 +0900 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor on_graph_success : dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor on_graph_success
Stack Trace:
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/grpc/impl.py", line 284, in get_external_sensor_execution
with user_code_error_boundary(
File "/Users/dpursehouse/git/project/.homebrew/Caskroom/miniforge/base/envs/nodejs-16.6.1+python-3.10.0/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Member of list mismatches type. Expected (<class 'dagster.core.definitions.run_request.SkipReason'>, <class 'dagster.core.definitions.run_request.RunRequest'>, <class 'dagster.core.definitions.run_request.PipelineRunReaction'>). Got DagsterRun(pipeline_name='pipeline_name', run_id='b8ed68e9-144e-4209-b736-e4ff158e010d', run_config={'resources': {'task_data': {'config': {}}}}, mode='default', asset_selection=None, solid_selection=None, solids_to_execute=None, step_keys_to_execute=None, status=<DagsterRunStatus.SUCCESS: 'SUCCESS'>, tags={'.dagster/grpc_info': '{"host": "localhost", "port": 4266}', '.dagster/scheduled_execution_time': '2022-07-12T07:35:00+00:00', 'dagster/schedule_name': 'pipeline_every_five_minutes'}, root_run_id=None, parent_run_id=None, pipeline_snapshot_id='eb8ce46ea6dcc07b761a6d3f089b6f7d0df81c34', execution_plan_snapshot_id='35c1ae10b8ffbbf04fc6b4a739eedab59ac3b975', external_pipeline_origin=ExternalPipelineOrigin(external_repository_origin=ExternalRepositoryOrigin(repository_location_origin=GrpcServerRepositoryLocationOrigin(host='localhost', port=4266, socket=None, location_name='project.pipelines.repository', use_ssl=None), repository_name='project'), pipeline_name='pipeline'), pipeline_code_origin=PipelinePythonOrigin(pipeline_name='pipeline', repository_origin=RepositoryPythonOrigin(executable_path='/Users/dpursehouse/git/project/.venv/bin/python', code_pointer=PackageCodePointer(module='project.pipelines.repository', attribute='project', working_directory='/Users/dpursehouse/git/project'), container_image=None, entry_point=['dagster'], container_context={}))) of type <class 'dagster.core.storage.pipeline_run.DagsterRun'>.
Stack Trace:
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
yield
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/core/definitions/sensor_definition.py", line 373, in evaluate_tick
check.is_list(result, (SkipReason, RunRequest, PipelineRunReaction))
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/_check/__init__.py", line 862, in is_list
return _check_iterable_items(obj, of_type, "list")
File "/Users/dpursehouse/git/project/.venv/lib/python3.10/site-packages/dagster/_check/__init__.py", line 1597, in _check_iterable_items
raise CheckError(
DagsterRun
while it expects one of the other types:
check.is_list(result, (SkipReason, RunRequest, PipelineRunReaction))
0.15.3
that introduces this issue.yuhan
07/13/2022, 8:30 PMDavid
07/13/2022, 11:01 PM@run_status_sensor(
pipeline_run_status=DagsterRunStatus.SUCCESS,
minimum_interval_seconds=10,
default_status=DefaultSensorStatus.RUNNING,
)
def on_graph_success(
context: RunStatusSensorContext,
) -> SkipReason | PipelineRunReaction:
"""On success."""
# ...
# Redacted code here does some stuff in our database, not using any dagster APIs
# ...
return PipelineRunReaction(context.dagster_run)
yuhan
07/13/2022, 11:26 PMDavid
07/15/2022, 5:25 AM@graph
that we are configuring to run on a schedule like this (returned in the schedules
field of the return value from our @repository
method):
"graph_every_five_minutes": ScheduleDefinition(
name="graph_every_five_minutes",
description="Process every five minutes.",
job=graph_every_five_minutes_job,
cron_schedule="*/5 * * * *",
execution_timezone="US/Pacific",
run_config=dagster.build_run_config(),
default_status=DefaultScheduleStatus.RUNNING,
),
graph_every_five_minutes_job
is the result of calling to_job()
on the @graph
and dagster.build_run_config()
is a locally implemented method that sets up some resource configs.yuhan
07/15/2022, 6:10 AMDavid
07/15/2022, 6:45 AMsandy
07/19/2022, 11:59 PMjamie
07/20/2022, 1:06 PMDavid
07/20/2022, 11:05 PMjamie
07/21/2022, 6:12 PMDavid
07/21/2022, 11:48 PM