Zach P
10/07/2022, 3:58 PMclass TestMultiAssetSensor:
def test_routes(self):
context = build_multi_asset_sensor_context(
repository_def=dummy_repo,
asset_keys=[AssetKey("dummy_asset")],
cursor=json.dumps({"dummy_asset": ("202209", "abc")}),
)
for run_request in db_backend_upstream_sensor(context):
print(run_request)
assert False # To trigger PDB
Results in:
dagster._core.errors.DagsterInvariantViolationError: Attempted to initialize dagster instance, but no instance reference was provided.
context.latest_materialization_records_by_key()
, mentioning that no asset def was given.
debug_error_string = "{"created":"@1665166338.602104000","description":"Error received from peer unix:/var/folders/78/l6wpx70j495f9hd1qf1gvvr40000gn/T/tmpe3d1y2j2","file":"src/core/lib/surface/call.cc","file_line":967,"grpc_message":"Exception iterating responses: AssetKey(s) {'cross_repo_asset'} were selected, but no AssetsDefinition objects supply these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions are correctly added to the repository.","grpc_status":2}"
I’ve checked the asset key is spelled correctly, and it’s added to the other repository, but not the one the sensor is in. I’d rather not duplicate assets by adding them to a bunch of repositories.
For now, it looks like I can just use several asset_sensors instead of one multi asset sensor, but I’d prefer to just use one 🙂@multi_asset_sensor(
asset_selection=AssetSelection.keys("cross_repo_asset")
)
and:
@multi_asset_sensor(
asset_keys=[AssetKey("cross_repo_asset")],
)
Neither seem to work 😞, same error.
Also probably worth noting I’m testing with the CLI dagster sensor preview my_Sensor --location <repo_my_sensor_is_in>
, I’ve also tried with --location <repo my asset is in>
, but get an error about no sensor being there.sandy
10/07/2022, 8:33 PMclaire
10/07/2022, 8:36 PM@multi_asset_sensor(asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")], job=the_job)
def a_and_b_sensor(context):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest(run_key=context.cursor, run_config={})
@repository
def my_repo():
return [asset_a, asset_b, a_and_b_sensor]
with instance_for_test() as instance:
materialize([asset_a, asset_b], instance=instance)
ctx = build_multi_asset_sensor_context(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
instance=instance,
repository_def=my_repo,
)
assert list(a_and_b_sensor(ctx))[0].run_config == {}
Zach P
10/07/2022, 8:38 PM