Hi there, a question to sensors where `run_key` is...
# announcements
Hi there, a question to sensors where
isn’t a filename, but a list of filenames. Let’s say our pipeline will process a list of files rather than file-by-file, therefore we pass a list along. The sensor will check if there are new files and package them into a list of files around 500MB. Now the challenge, a file could be contained in different lists in case of re-processing. Is there a possible way the sensor could check if a “file in a list” has been processed? 👉 [more in thread]
We’re thinking now to persist a key and its list of files in redis. That way, we can check in a sensor if files are not processed twice. But do you see a way to achieve the same with the build-in
concept? We were brainstorming around some sort of
that includes all file_name (in our case file_ids), but that is quite ugly. Anyone had similar use-cases or having a hint on how to solve this? Highly appreciated!
Ya I don’t think
deduping is going to work in this case and you will just have to use your own scheme like you mentioned above
cc @prha
One way could be to yield an AssetMaterialization for each file that's processed and query the set of asset materializations for gaps
But there's a potential race condition there if you submit a run and need to make a decision about whether to submit a second run before the first run has had the chance to record an AssetMaterialization
You could also use tags to mark runs with their constituent files, and do tag-based run queries off of the instance (available on the
) before including the file
Has the nice effect that you can dedup as soon as the run is created, and doesn’t depend on completed execution of the pipeline
Wow thank you guy so much for your answers! The
I see not favoured as I’d rather create an asset from the output of the sensor (delta-table) than the input files 🤔. Follow up to
@prha: Where are these persisted, are these part of the Dagit pod? Meaning if we re-deploy Dagit or the pod is broken, the past tags are lost, or are they or can they be persisted somewhere save? I see this as an elegant solution with the
as long as we’re not losing them. Thanks again!
I guess
stored in RunStorage, is that correct? If so, that would be awesome, meaning it would lay in our postgresdb. Will check that! 😍
Yes, these are persisted in run storage, in postgres
Haven’t actually run this, but I expect it’d look something like this:
Copy code
from dagster.core.storage.pipeline_run import PipelineRunsFilter

def my_sensor(context):
    FILE_TAG_VALUE = "true"

    filenames = get_filenames_from_somewhere()

    to_request = []
    for filename in filenames:
        runs = context.instance.get_runs(PipelineRunsFilter(tags={filename: FILE_TAG_VALUE}))
        if runs:

    if not to_request:
        yield SkipReason('no new files')

    yield RunRequest(
        tags={ filename: FILE_TAG_VALUE for filename in to_request }
This is beautiful code, thanks @prha. We’ll check that and revert back. But thanks so much for your and the others help!