https://dagster.io/ logo
Title
z

Zach P

10/07/2022, 3:58 PM
How do I go about testing a multi asset sensor? I’m trying to build up a test case that checks to make sure the run requests are all valid, but I’m having some issues pretty early on. Essentially, I have a dummy job, asset, and repo and I’m trying to test them with my sensor but get a “attempted to init dagster instance, but not instance reference was provided”. However; I can’t use an ephemeral instance either. am I missing something? 🤔 Code in thread
:dagster-bot-resolve: 1
class TestMultiAssetSensor:
    def test_routes(self):
        context = build_multi_asset_sensor_context(
            repository_def=dummy_repo,
            asset_keys=[AssetKey("dummy_asset")],
            cursor=json.dumps({"dummy_asset": ("202209", "abc")}),
        )
        for run_request in db_backend_upstream_sensor(context):
            print(run_request)
        assert False # To trigger PDB
Results in:
dagster._core.errors.DagsterInvariantViolationError: Attempted to initialize dagster instance, but no instance reference was provided.
There doesnt seem to be any discussion of dagster instances in the sensor testing docs as far as I can see. I tried testing with an ephemeral dagster instance despite the docs warning it would cause an error, and it does in fact error :laugh-cry:
Also, I’m wondering if multi_asset_sensors are broken for cross-repository work. I’m trying to define one, but it fails when calling
context.latest_materialization_records_by_key()
, mentioning that no asset def was given.
debug_error_string = "{"created":"@1665166338.602104000","description":"Error received from peer unix:/var/folders/78/l6wpx70j495f9hd1qf1gvvr40000gn/T/tmpe3d1y2j2","file":"src/core/lib/surface/call.cc","file_line":967,"grpc_message":"Exception iterating responses: AssetKey(s) {'cross_repo_asset'} were selected, but no AssetsDefinition objects supply these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions are correctly added to the repository.","grpc_status":2}"
I’ve checked the asset key is spelled correctly, and it’s added to the other repository, but not the one the sensor is in. I’d rather not duplicate assets by adding them to a bunch of repositories. For now, it looks like I can just use several asset_sensors instead of one multi asset sensor, but I’d prefer to just use one 🙂
I’ve also tried using two different methods to pass assets to the sensor:
@multi_asset_sensor(
    asset_selection=AssetSelection.keys("cross_repo_asset")
)
and:
@multi_asset_sensor(
    asset_keys=[AssetKey("cross_repo_asset")],
)
Neither seem to work 😞, same error. Also probably worth noting I’m testing with the CLI
dagster sensor preview my_Sensor --location <repo_my_sensor_is_in>
, I’ve also tried with
--location <repo my asset is in>
, but get an error about no sensor being there.
s

sandy

10/07/2022, 8:33 PM
@jamie @claire - mind chiming in?
👍 1
c

claire

10/07/2022, 8:36 PM
Hi Zach. Apologies, the asset selection bug is a regression introduced in this latest release and will be resolved in the next release. You can use an instance for test to test a multi asset sensor, something like:
@multi_asset_sensor(asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")], job=the_job)
    def a_and_b_sensor(context):
        asset_events = context.latest_materialization_records_by_key()
        if all(asset_events.values()):
            context.advance_all_cursors()
            return RunRequest(run_key=context.cursor, run_config={})

    @repository
    def my_repo():
        return [asset_a, asset_b, a_and_b_sensor]

    with instance_for_test() as instance:
        materialize([asset_a, asset_b], instance=instance)
        ctx = build_multi_asset_sensor_context(
            asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
            instance=instance,
            repository_def=my_repo,
        )
        assert list(a_and_b_sensor(ctx))[0].run_config == {}
❤️ 1
z

Zach P

10/07/2022, 8:38 PM
Thanks Claire!