https://dagster.io/ logo
#announcements
Title
# announcements
s

Simon Späti

03/02/2021, 11:06 AM
Hi there, a question to sensors where
run_key
isn’t a filename, but a list of filenames. Let’s say our pipeline will process a list of files rather than file-by-file, therefore we pass a list along. The sensor will check if there are new files and package them into a list of files around 500MB. Now the challenge, a file could be contained in different lists in case of re-processing. Is there a possible way the sensor could check if a “file in a list” has been processed? 👉 [more in thread]
We’re thinking now to persist a key and its list of files in redis. That way, we can check in a sensor if files are not processed twice. But do you see a way to achieve the same with the build-in
run_key
concept? We were brainstorming around some sort of
run_key
that includes all file_name (in our case file_ids), but that is quite ugly. Anyone had similar use-cases or having a hint on how to solve this? Highly appreciated!
a

alex

03/02/2021, 4:08 PM
Ya I don’t think
run_key
deduping is going to work in this case and you will just have to use your own scheme like you mentioned above
cc @prha
s

sandy

03/02/2021, 4:09 PM
One way could be to yield an AssetMaterialization for each file that's processed and query the set of asset materializations for gaps
But there's a potential race condition there if you submit a run and need to make a decision about whether to submit a second run before the first run has had the chance to record an AssetMaterialization
p

prha

03/02/2021, 4:14 PM
You could also use tags to mark runs with their constituent files, and do tag-based run queries off of the instance (available on the
SensorExecutionContext
) before including the file
Has the nice effect that you can dedup as soon as the run is created, and doesn’t depend on completed execution of the pipeline
s

Simon Späti

03/03/2021, 7:37 AM
Wow thank you guy so much for your answers! The
AssetMaterialization
I see not favoured as I’d rather create an asset from the output of the sensor (delta-table) than the input files 🤔. Follow up to
tags
@prha: Where are these persisted, are these part of the Dagit pod? Meaning if we re-deploy Dagit or the pod is broken, the past tags are lost, or are they or can they be persisted somewhere save? I see this as an elegant solution with the
SensorExecutionContext
as long as we’re not losing them. Thanks again!
I guess
tags
stored in RunStorage, is that correct? If so, that would be awesome, meaning it would lay in our postgresdb. Will check that! 😍
p

prha

03/03/2021, 5:15 PM
Yes, these are persisted in run storage, in postgres
Haven’t actually run this, but I expect it’d look something like this:
Copy code
from dagster.core.storage.pipeline_run import PipelineRunsFilter

@sensor(pipeline_name="my_pipeline")
def my_sensor(context):
    FILE_TAG_VALUE = "true"

    filenames = get_filenames_from_somewhere()

    to_request = []
    for filename in filenames:
        runs = context.instance.get_runs(PipelineRunsFilter(tags={filename: FILE_TAG_VALUE}))
        if runs:
            to_request.append(filename)

    if not to_request:
        yield SkipReason('no new files')
        return

    yield RunRequest(
        run_key=generate_key_from_files(to_request)
        run_config=generate_config_from_files(to_request)
        tags={ filename: FILE_TAG_VALUE for filename in to_request }
    )
s

Simon Späti

03/04/2021, 8:10 AM
This is beautiful code, thanks @prha. We’ll check that and revert back. But thanks so much for your and the others help!
26 Views