How can I programmatically terminate / skip the ma...
# ask-community
How can I programmatically terminate / skip the materialization of an asset based on a condition?
🤖 1
or is it considered best practice to have something along these lines:
Copy code
def my_asset(context) -> Union[pd.DataFrame, None]:
	if condition:
		return pd.DataFrame({'foo':[1,2,3]})
		return None
(i.e. return None) from the Asset. And ensure that the IO manager knows how to deal with None (i.e. not do anything)?
however it looks like no-op runs still show up
how can I prevent these from showing up?
Or would it be at least possible to filter out NO-OP materializations (perhaps based on specific tags)
for these NO-OP materializations, would you still want to update assets that are downstream of the non-updated asset?
as no fresher data arrived there is nothing to update
makes perfect sense. for this sort of use case you can leverage the conditional execution capability. basically, if an output is marked as "not required", and it is not emitted during a computation, downstream computation will be skipped. unfortunately, the @asset decorator doesn't allow you to set that attribute of the underlying output, but the multi_asset decorator does, so a hacky solution would be to do something where you create a multi asset that only creates a single asset:
Copy code
@multi_asset(outs={"my_asset": Out(is_required=False)})
def my_asset(context):
	if condition:
		yield Output(pd.DataFrame({'foo':[1,2,3]}), name="my_asset")
Interesting thoughts - this might actually work. I wonder though if I can somewhere (perhaps regular runs) observe that this was skipped?
i.e. to be able to show/validate that the job was actually running (and no fresher data was found)
For the
outs = my_asset
could this somehow also like in the normal SDA world directly take the name of the @asset/@multi-asset (fake) annotated function?
does not support the
field. Am I overlooking another attribute with similar functionality?
re: the seeing that computation was skipped, that will show up in the run view in dagit (and there will be a STEP_SKIPPED entry in the event log). I'm not actually 100% sure what a skipped asset would look like in the asset graph view (cc @claire if you know off the top of your head).
re: the metadata, if you put the same metadata dictionary that was in the asset decorator into the out (i.e.
Out(is_required=False, metadata=...)
), this will have the same end result
I just tested your suggestion. When totally dropping the materialization it works. Together with sandy was just created to not loose the overview / observability i.e. to have a single place for the asset to quickly view all of its runs (even if they did not materialize)
I think at least with regards to the details it looks like I would have expected
Good to know - but was referring to input metadata.
But with the explicit approach I guess this could also be solved
looks like this was a false flag and it does not work. (the markdown part)
To me it looks like
still works just like it did before - so for the output metadata it looks like I would be covered
hm what do you mean by input metadata? if you have an asset defined like:
Copy code
@asset(..., metadata={...})
def my_asset():
    return 1
then this (under the hood) will create an op like this:
Copy code
@op(out={"result": Out(metadata={...})})
def my_asset():
    return 1
I see - then it should work (and I probably should rethink if it is a hack what I am doing with the metadata).
Then the only thing which remains open for me is: The name of the asset needs to be specified 3 times i.e. 1) the function 2) in the dict for the outs and 3) in the yielded Out Event. Can I somehow reduce this to only refer to the name once?
the name of the function has no bearing on the name of the asset with the multi-asset decorator (which reduces it down to two), but I think you'd still need to specify it those two times unfortunately
ok - understood. Thanks.