Jamie Lee
01/10/2023, 11:36 AMdagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor trigger_main_pipeline_on_prod_dms
File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
AttributeError: 'int' object has no attribute 'items'
File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 199, in user_code_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 425, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 594, in _wrapped_fn
for item in result:
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/multi_asset_sensor_definition.py", line 1101, in _fn
multi_asset_sensor_context = MultiAssetSensorEvaluationContext(
File "/usr/local/lib/python3.8/site-packages/dagster/_annotations.py", line 123, in __init__
undecorated_init(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/multi_asset_sensor_definition.py", line 253, in __init__
self._unpacked_cursor = MultiAssetSensorContextCursor(cursor, self)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/multi_asset_sensor_definition.py", line 128, in __init__
for str_asset_key, cursor_list in loaded_cursor.items():
Nicolas Parot Alvarez
01/10/2023, 6:07 PMfrom dagster import multi_asset_sensor, RunRequest, AssetSelection, asset, define_asset_job, Definitions
@asset
def asset1():
return [1, 2, 3]
@asset
def asset2(asset1):
return asset1 + [4]
@asset
def asset3(asset2):
return asset2 + [4]
my_job12 = define_asset_job(name="j12", selection=AssetSelection.assets(asset1, asset2))
my_job3 = define_asset_job(name="j3", selection=AssetSelection.assets(asset3))
@multi_asset_sensor(
asset_selection=AssetSelection.assets(asset1, asset2),
job=my_job3,
)
def asset_1_and_2_sensor(context):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest()
defs = Definitions(
assets=[asset1, asset2, asset3],
jobs=[my_job12, my_job3],
sensors=[asset_1_and_2_sensor]
)
sandy
01/10/2023, 9:22 PMclaire
01/10/2023, 9:39 PMint
cursor. My guess about what's happening is you had some other type of sensor (i.e. an asset sensor) that had the same name as your multi-asset sensor. When dagster attempts to read the cursor from storage, that cursor will be your asset sensor cursor instead of a new cursor.
You could fix this by renaming your sensor to something new and unused, or you could reset the cursor:
dagster sensor cursor --delete
Jamie Lee
01/11/2023, 4:39 PM