https://dagster.io/ logo
#ask-community
Title
# ask-community
j

James Barnden

12/16/2022, 6:00 PM
Hi people, I’m trying to test out the multi-asset-sensors by setting up a couple of dummy jobs which do nothing but yield asset keys (
dummy
and
dummy_2
), then having a third job with a sensor for the former. Both assets generate just fine, but the job with a sensor fails with error message “Exception iterating responses: No asset with AssetKey([‘dummy’]) found in repository”“. Any ideas here please?
dummy 1 .py
Copy code
from dagster import AssetMaterialization, In, MetadataEntry, Nothing, Out, job, op, AssetKey, asset
from modes import annotations, job_executor_def, prod_resources

@op(
    name="dummy_op",
    ins={
        "ready": In(dagster_type=Nothing),
    },
    tags=annotations,
)
def dummy_op(context):
    yield AssetMaterialization(
        asset_key="dms_dummy_job_completed",
        metadata_entries=[
            MetadataEntry.text(label="my_label", text="words"),
        ],
    )

@job(
    resource_defs=prod_resources,
    name="dms_dummy_job",
    executor_def=job_executor_def,
    tags=annotations,
)
def dms_dummy_job():
    dummy_op()
sensor setup
Copy code
@multi_asset_sensor(
    asset_keys=[
        AssetKey("dms_dummy_job_completed"),
        AssetKey("dms_dummy_job_2_completed"),
    ],
    job=main_pipeline_sensor_dummy_job,
    name="asset_a_and_b_sensor"
)
def asset_a_and_b_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
    return RunRequest(
        run_key=context.cursor,
    )
j

jamie

12/16/2022, 6:34 PM
when you got this error had you run both ops that materialize the dummy assets? since you have ops yielding asset materializations, dagster wont know the keys for assets until the op has been run. when the multi asset sensor runs, it tries to get information about all of the assets it’s monitoring, so if dagster doesn’t “know”about one (ie if the op that makes it hasn’t been run) then you’ll get an error
👀 1
j

James Barnden

12/20/2022, 1:11 PM
Yep - both ops that generate my 2 assets have been run multiple times. So this exists -
yet when I check the sensor’s status
Copy code
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating responses: No asset with AssetKey(['dms_dummy_job_completed']) found in repository"
debug_error_string = "{"created":"@1671541756.097647006","description":"Error received from peer ipv4:172.18.0.3:4000","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Exception iterating responses: No asset with AssetKey(['dms_dummy_job_completed']) found in repository","grpc_status":2}"
>
j

jamie

12/20/2022, 5:03 PM
ok let me try to replicate and i’ll see what i can find
j

James Barnden

12/20/2022, 6:57 PM
Thank you! We’ve just changed from using the
@op
and
@job
wrappers to just using
@asset
like this:
Copy code
@asset
def dummy_op(context):
    context.log_event(
        AssetMaterialization(
            asset_key = "dms_dummy_job_completed",
            metadata_entries=[MetadataEntry.text(label="my_label", text="words")],
        )
    )
So in Dagit I’m no longer seeing jobs but they’re in the default asset group (reading into asset groups now as this is pretty new to me) We have the old asset keys and new asset keys as shown here - they seem to materialise differently in ways that I don’t really understand… but the multi asset sensor does now work with this change when looking for
dummy_op
and
dummy_op_2
Seems like the op/job setup didn’t actually materialise anything?
j

jamie

12/20/2022, 7:27 PM
Seems like the op/job setup didn’t actually materialise anything?
yeah that’s basically correct. In an op, when you log an
AssetMaterialization
you’re basically telling dagster “hey i manually materialized this data, please keep track of it like an asset”. So for example, if you had an op that may upload 10 files to buckets, or may upload 100 files to buckets and you want Dagster to keep track of each of those files like assets, then you could log an AssetMaterialization for each. In general using
@asset
is the recommended approach, unless you’re in one of the more complicated cases.
they seem to materialise differently in ways that I don’t really understand
can you elaborate on this? I may be able to help clear up some of the confusion? what behavior are you seeing/what would you expect to see?
🌈 1
💡 1
j

James Barnden

12/20/2022, 7:40 PM
I see, that’s a really clear way of describing it! 🙏
can you elaborate on this?
Yup, in my screenshot above I interpret that as Dagster saying it knows about 4 asset keys that could exist. 2 are from my first attempt in op/job wrappers, 2 are from wrapping in asset I had previously defined
dms_dummy_job_completed
by:
Copy code
@op(
    name="dummy_op",	
    ins={	
        "ready": In(dagster_type=Nothing),	
    },	
    out=Out(dagster_type=Nothing),	
    tags=annotations,	
)
def dummy_op(context):
    yield AssetMaterialization(
        asset_key="dms_dummy_job_completed",
        metadata_entries=[
            MetadataEntry.text(label="my_label", text="words"),
        ],
    )

@job(	
    resource_defs=prod_resources,	
    name="dms_dummy_job",	
    executor_def=job_executor_def,	
    tags=annotations,	
)	
def dms_dummy_job():	
    dummy_op()
Launching Dagit and telling this job to run would succeed, so I’d expect an empty asset and the corresponding
dms_dummy_job_completed
asset key that I could feed to the sensor - but the sensor did not find that asset_key Now we have:
Copy code
@asset
def dummy_op(context):
    context.log_event(
        AssetMaterialization(
            asset_key = "dms_dummy_job_completed",
            metadata_entries=[MetadataEntry.text(label="my_label", text="words")],
        )
    )
I’d expect this to also generate an empty asset with the same asset key name of
dms_dummy_job_completed
Instead of running it as a job, I can view the asset and tell it to materialize which does indeed, and my sensor in turn recognises
dms_dummy_job_completed
and then runs
I’m logging off for today, I really appreciate the help!
j

jamie

12/20/2022, 8:02 PM
ok i see. So when you use the
@asset
decorator, the name of the asset is actually the name of the function you decorate. so in your case, the asset would be named
dummy_op
. Since you’re also logging the AssetMaterialization, you’re additionally telling dagster that you’ve manually created some other asset called
dms_dummy_job_completed
. So if you just wanted an asset called
dms_dummy_job_completed
you would do
Copy code
@asset 
def dms_dummy_job_completed():
   ...
without the call to
context.log_event
. In the body of the function you have your code to make your asset and then you can either return it from the function and let an IO manager take care of storing the asset wherever you want it stored, or you can manually store the asset somewhere and then return None (however if you do this it’ll be harder to use the asset in a downstream asset, I can elaborate on this more if it would be useful)
🔥 1
🙏 1
j

James Barnden

01/05/2023, 1:40 PM
Thank you once again (and happy new year 🙂 ) I’d like to then clarify my understanding of ops vs assets and when each would be appropriate. To me, they seem like two sides of the same coin, with an op being a process, and an asset being the output of the process - so that would imply an output cannot exist without a process - which makes it difficult for me to understand the difference between using them. From what I understand so far…
@asset
decorators are preferred and best used to define a single output OR multiple outputs (e.g. data tables) that would generally be considered ‘one asset’ (e.g. a database of many tables has been refreshed). Assets can depend on other assets which builds an implicit dependency and asset materialisation happens in the correct order.
@op
decorators would be useful if defining a more complex process which may produce multiple assets through a similar process. Those assets are either then required to be considered separately, or not at all (for whatever reason)?
j

jamie

01/05/2023, 3:46 PM
in a lot of ways assets and ops are really similar, like you mentioned. For me, one of the easiest ways for me to think about the difference is on more of a “principle” level instead of a code level. In the mental model, assets are the data table, or jupyter notebook, or whatever the output is. This output will be persisted over time, but you’ll likely want to update it’s contents as new data come in (ie add new rows to your table or re-execute your notebook). Obviously there’s a task that’s required to create the output, but the main thing we care about in the mental model is the output, not the task. Conversely, ops are focused on the tasks, and the task could produce a a data table, or it could send an email or whatever else you may want to do. The main thing we care about in the mental model is the task that is executed. For the email example, we probably wouldn’t want to write that as an asset since we’re likely going to be rerunning the task lots of times to send emails to different people, and an asset is meant to continuously update the same persistent object. In a more practical sense, behind the scenes, an asset is a wrapper around op that is more specific and has extra features. By have more requirements around what is an asset and how it behaves, we can write more complex and helpful features (for example, being able to rematerialize a single asset without materializing all of its parents). If you want to go more in depth on this, i talk through designing a data pipeline with a task based approach vs an asset based approach and some of the reasons we lean toward the asset based approach in the first part of this

webinar

🌈 1
👍 1
j

James Barnden

01/05/2023, 5:34 PM
I think the mental model approach and that webinar cracked it for me! Great webinar too, I ended up watching the whole thing and it’s incredibly well presented, thanks for all your time and the resources 🙏