Can you point me out to any example or docs, for t...
# ask-community
f
Can you point me out to any example or docs, for this use case. I have a folder A with different files and I want process and put the process file in a different folder B, if a new file is added to folder A a job will run to process this new file. for the moment in dagster I saw just examples where you have one big file, table etc... to process
j
hi @Francesco Piccoli if you write an
op
that can move one file from folder A to folder B, you can set up a sensor that will start a run of the op for every file in folder A https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#defining-a-sensor you won’t need to worry about moving files twice, the
run_key
in the
RunRequest
ensures that duplicate `run_key`s don’t actually get run. It’s described more here https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#idempotence-and-cursors
f
ok thanks, that's exactly what I was looking for, Now I need just to read 😉 thank @jamie
do you know if I can find the same example in docs but in github somwhere, just to be sure to organize the code as needed (folder structure)? since docs snippets do not show you the overall structure
j
these specific snippets aren’t part of a larger project, but we do have an example projects with sensors. we also have this guide about structuring dagster projects https://docs.dagster.io/guides/dagster/recommended-project-structure example project: https://github.com/dagster-io/dagster/tree/master/examples/project_fully_featured - sensor folder
f
ok great, you point me out again to what I was looking for, two time in a rows seam impressive to me, thanks again @jamie
now that I see how to do a folder to folder A -> B, I needed to repeat the process for different folder to folder A1 -> B1 A2 -> B2 do I need to repeat the code and logic for each folder to folder or there are some mechanis to abstract and be dry
j
I’d recommend doing something like this (partial pseudocode and may have syntax errors)
Copy code
class MoveFolderConfig(Config):
    filename: str 
    current_dir: str 
    destination_dir: str 

@op
def move_file(config: MoveFolderConfig):
    # use the attributes of config to copy the file

@job
def move_file_job():
    move_file()

@sensor(job=move_file_job)
def move_folders():
   folder_pairs = [("A", "B"), ("A1", "B1"), ("A2", "B2")]
   for (to, from) in folder_pairs:
     for filename in to:
         yield RunRequest(
                run_key=filename,
                run_config={
                    "ops": {"move_file": {"config": {"filename": filename, "current_dir": to, "destination_dir": from}}}
                }
f
ok that's work for me! So the two thing to take i mind are 1. sensor decorated function are repeast each x seconds and do not send request with say run_key twice 2. you can give paramters to your sensor running ops passing them in a config object
I'm not sure why in this context we need a job, since sensor and ops seam to do the "job" just by their own
j
yeah so wrapping the op in a job is just a dagster concept thing. We don’t execute plain ops, we only execute jobs. This is since most jobs are made of lots of ops, but in your case it’ll probably just be one op
f
ok but the sensor seam to be just aware of ops not jobs,
Copy code
run_config={
                    "ops": {"move_file":
so where dagter know need to execute a job?
j
dagster executes the job specified in the
job
parameter of the sensor decorator. the run config is to pass along those configuration values to the
move_file
op within the job
f
yes now I see I forget about decorator parameter, so sensor is bind to a specific job and the Run request somehow communicate with this job and tell to just run a specific ops with spefic paramters
j
yep! The RunRequest actually runs the entire job, and the run_config passes those configuration values to the op
f
ok and for dagster there are no way like for asset to say to to not run certain ops if not needed?
j
no it’ll run the whole job, so you should only put the ops you need in the job the sensor runs
f
Thanks! Learning really a lot from this thread!
🎉 1
If I well understand from above there are two way to describe how to pass argumetns to ops via request 1. via config_schema 2. subclassing the Config class. What are the pro and contro of the two approc or they are equivalents? and there are other way other then the two?
j
basically config_schema is the “old” way of doing things, and Config classes are a replacement that allows for better typing, and you don’t have to reach into the context to get the values
but yeah, config is the only way to pass values from a request to an op
f
should I consider config_schema more as legacy then and just use Config?
j
yes
f
thanks that very usefull to know may be to point out in the docs today use config_schema
j
yes we’re still in the process of migrating all of our docs to the new APIs
f
if I can give you a suggestion may be having some flag that can signal the reader that this doc page is in process of changing can be a good trade off between the effort to have to continuslly change docs and the importance of inform user of changes.
j
yeah i can pass that along. do you mind linking the pages where you were seeing config_schema as well?
f
Yes of course, if I can be usefull somehow after all your help is just make me happy 🙂 https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#defining-a-sensor
we mention before that once I process an item with a certain key then I will not to the next sensor evaluation, but what if I want reprocess this key, are there a way to manage more complex scenario where you want decide what to process or not? if you have any docs I need to read I did not found please not hesitate to share.
j
we don’t have a built in way to do that, but you could always manually start a run of the job with the file. or you could keep track of which runs to kick off by maintaining a file of which files have been moved, or checking the destination file or something
f
where the list of keys are stored? I mean let's say I want maniupate this set, can I ? Let's say for example I want reset and put it to the empty.
j
i can ask around and see!
🙏 1
f
I have try to scale up on the problem I describe and that you help me to build, for small date all seam good but when I try to process a little bit I got the following behviour 1. each individual process seam slow (taking 1s when the op shoul probably take ms) 2. it give me after processing some keys this error https://sqlalche.me/e/14/e3q8
to give me data quantity a folder contain about 50k files and stop at 195. Each file could from few kb to 1Mb I woudl say avrage about 20kb