https://dagster.io/ logo
#ask-community
Title
# ask-community
y

Yevhen Samoilenko

08/25/2022, 4:42 PM
Hi! Is there any way to find out whether upstream asset was materialized during the current run or not from an IO manager's load_input method? It might be useful if we selected only some of assets for materialization. But one of the multiassets uses non-selected assets as inputs. Something like this:
Copy code
def load_input(self, context: InputContext) -> str:
    materialized = context.upstream_output.materialized

    if materialized:
        return 'materialized'

    return ''
s

Saul Burgos

08/25/2022, 9:02 PM
I saw code before is not my but it can help
Copy code
from dagster.core.storage.pipeline_run import RunsFilter

@sensor(job=do_the_second_thing)
def thing_sensor(context):
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name="do_the_first_thing",
            statuses=[PipelineRunStatus.SUCCESS],
            updated_after=..., # can also filter by timestamp to do more efficient fetching
        ),
        order_by="update_timestamp",
        ascending=False,
    )
    for run_record in run_records:
        yield RunRequest(
            run_key=run_record.run_id,  # avoid double firing for the same run
        )
c

chris

08/25/2022, 9:08 PM
Hey Yevhen - kind of involved, but you can do this by polling the instance directly for the asset records corresponding to the current run id from within your
load_input
method. Something like:
Copy code
def load_input(self, context: InputContext):
    for record in context.instance.get_asset_records(my_asset_key):
        if record.asset_entry.last_run_id == context.run_id: 
            # this means that the asset was materialized in the current run
👍 1
I feel like there should be an easier way to do this, so I submitted an issue
👍 1
y

Yevhen Samoilenko

08/26/2022, 7:59 AM
Thanks! It would be super helpful!
10 Views