Hello! Has anyone setup up a working version of a...
Hello! Has anyone setup up a working version of a scheduler that can build/edit the pipeline before executing it? I have written the functionality required to produce the entire pipeline (solids, solid configs, dependencies), but I'd like to be able to adjust it when the scheduler runs. Let me know 👍
By adjust I mean edit the solid_defs and dependencies.
Hi Thomas - the scheduler loads your pipeline code every time that it runs, so I think it already works the way that you’re hoping Edit: Misunderstood what you meant by scheduler
Hi Thomas (take two 🙂 ) - Dagster will recreate your pipeline from code every time it launches a run, which should pick up changes to the solid definitions and dependencies (we don't really have a single component that maps to a 'scheduler' in the Airflow sense - each pipeline run happens in its own process)
Maybe here's a better description: I have a function that generates three things: a list of solids, a solids config, and a dependency config. This function queries an external data source to figure out exactly what solids are built. This external data source needs to be re-examined every time the scheduler goes off, so that the pipeline can be "re-generated". An example of what I tried to do (which doesn't work but will illustrate) is to create a pipeline via
with an empty solids list. Then, within the scheduler code, call the function above, and try to replace the empty list with the new list -- thus changing the pipeline solids list right before it's executed. There don't seem to be any functions for editing a
within the scheduler function definition, but that's kind of what I want to do. One solution I thought of just now was to have the solid definitions exist outside the function above, and simply change the run config for the pipeline, but then there's a mismatch between the solids array and the solids config. E.g. the pipeline needs to run solid X 3 times for round one, then scheduler ticks, and now it needs to run solid X 4 times. I can add another entry to the solid config, but the pipeline's original creation with 3 solids would be unchanged, and thus I'd probably get an error. Not sure if this is making sense -- happy to explain more if it's unclear.
I think the idea of pipeline "re-generation" best describes what I'm trying to do -- I want to define a scheduler once, but each time it ticks, it regenerates the pipeline it's supposed to start.
ideally the scheduler function calls my helper function, which returns the required values for a pipeline (solids, mode, dependencies, etc.)
Ah, I think I might understand more about what you're asking. Are you hoping to change the dependency/solid structure of the pipeline within a single run of that pipeline?
(If so, that's not something we really support currently unfortunately. The closest is this dynamic orchestration feature: https://docs.dagster.io/_apidocs/dynamic#dynamic-mapping-collect-experimental, but that's more specific and constrained than what you're describing here)
Got it — final question that perhaps you’ve seen: I’m running the docker deployment you guys have with pipeline, dagit and the daemon. Anyway to get the whole thing to reload on a cadence? Almost restart it every X minutes via dagster settings. I will look into doing this at the docker level as well.
Regardless I appreciate the support!
For now that'd have to be done at the docker level with that setup, yeah (although we'd like to add options for more direct management in the future)
sorry -- one final question if you have the time: I had a version of things that leveraged
within other pipelines / solids, but there was no way to visualize the nested pipelines -- the top level pipeline just ended with the execute command. Is there anyway around this?
One pattern that we recommend is using sensors to trigger pipelines based on assets produced in other pipelines: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors - that lets you visualize each of the pipelines, and we're contemplating ways to let you visualize the whole chain of pipelines when you use this feature
Hey @daniel quick follow up: if I create an asset with metadata, is there anyway to access this metadata when I grab events with
? I'm hoping to use the asset materialization as a way to pass a value.
Can't find an example in the docs, I only see:
record_id, event = events[0]
in the docs and I can't find a definition of what properties that
has or how to set its metadata.
Hi Thomas - could you actually make a new post here for this? I’m not actually sure but I’m confident somebody else on the core team will be
no problem!