https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Robert Wade

04/06/2023, 8:35 PM
I’m trying to use a multi_asset_sensor with partitioned assets in order to execute a partitioned job and it is failing. Both my assets are DailyPartitioned. The job is also DailyPartitioned. When the sensor (which I lifted directly from https://docs.dagster.io/concepts/partitions-schedules-sensors/asset-sensors verbatim, except for the name of the downstream job) ticks every 30 seconds, it provides this error:
Copy code
dagster._core.errors.DagsterInvalidInvocationError: All assets must be partitioned and shared the same partitions definition
Which they do. Thanks in advance.
s

sandy

04/06/2023, 8:38 PM
Do they DailyPartitionsDefinitions have the same start date? Would you be able to share a stack trace?
r

Robert Wade

04/06/2023, 8:40 PM
yes. Exact same date.
Copy code
^[[32m2023-04-07 08:56:00 -0700^[[0m - dagster.daemon.SensorDaemon - ^[[34mINFO^[[0m - Checking for new runs for sensor: trigger_daily_asset_if_both_upstream_partitions_materialized
^[[32m2023-04-07 08:56:00 -0700^[[0m - dagster.daemon.SensorDaemon - ^[[34mERROR^[[0m - ^[[31mSensor daemon caught an error for sensor trigger_daily_asset_if_both_upstream_partitions_materialized^[[0m
Traceback (most recent call last):
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_daemon/sensor.py", line 507, in _process_tick_generator
    yield from _evaluate_sensor(
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_daemon/sensor.py", line 570, in _evaluate_sensor
    sensor_runtime_data = code_location.get_external_sensor_execution_data(
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_core/host_representation/code_location.py", line 856, in get_external_sensor_execution_data
    return sync_get_external_sensor_execution_data_grpc(
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_api/snapshot_sensor.py", line 78, in sync_get_external_sensor_execution_data_grpc
    raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterInvalidInvocationError: All assets must be partitioned and share the same partitions definition
 
Stack Trace:
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 360, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 555, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 732, in _wrapped_fn
    for item in result:
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_core/definitions/multi_asset_sensor_definition.py", line 1073, in _fn
    result = materialization_fn(multi_asset_sensor_context)
  File "/Users/k30818/PycharmProjects/horizon-rdf/horizon_rdf/etl/location2/l2_assets.py", line 56, in trigger_daily_asset_if_both_upstream_partitions_materialized
    ) in context.latest_materialization_records_by_partition_and_asset().items():
  File "/opt/anaconda3/envs/etl7/lib/python3.9/site-packages/dagster/_core/definitions/multi_asset_sensor_definition.py", line 560, in latest_materialization_records_by_partition_and_asset
    raise DagsterInvalidInvocationError(
I have a sample running successfully (again, exact same code except for asset and job names) of a multi asset sensor but in this example all assets are in the same code location. So it appears that the error I reported definitely relates to the sensor reacting to assets from a different code location.
s

sandy

04/18/2023, 4:09 PM
are you using a
SourceAsset
to represent the asset that's in a different code location? if so, is there a
partitions_def
on that
SourceAsset
?
r

Robert Wade

04/20/2023, 6:41 PM
Here is the definition of the sensor:
Copy code
l1_second_asset_var = SourceAsset(key=AssetKey("l1_second_asset"),
                                  partitions_def=DailyPartitionsDefinition(start_date='2023-03-01'))
l1_third_asset_var = SourceAsset(key=AssetKey("l1_third_asset"),
                                 partitions_def=DailyPartitionsDefinition(start_date='2023-03-01'))


@multi_asset_sensor(
    monitored_assets=[
        AssetKey("l1_second_asset"),
        AssetKey("l1_third_asset"),
    ],
    job=location2_job,

)
To answer your questions: 1. Yes, I represent both assets that exist within a different code location as SourceAssets. 2. Both SourceAssets have the exact same partitions_def as everything else. 3. A multi_asset_sensor takes AssetKeys, not SourceAssets, so I reference the same AssetKey names.
s

sandy

04/21/2023, 12:45 AM
This is definitely unexpected. I am going to try this out
I was able to reproduce this, and it did not behave as I expected. Thanks for sticking with it on this. I filed an issue to track this: https://github.com/dagster-io/dagster/issues/13834. @claire - are you able to take a look at this?
r

Robert Wade

04/21/2023, 3:04 PM
Thank you
c

claire

04/21/2023, 4:47 PM
yep, I can take the issue. Will fix it when I get the chance
r

Robert Wade

05/02/2023, 6:22 PM
Hi @claire I see you fixed this issue. How will I know when this is released?
c

claire

05/02/2023, 6:23 PM
should be released this week (either tomorrow or thursday) in 1.3.3.
r

Robert Wade

05/02/2023, 6:23 PM
thank you!
7 Views