Hello! Has anyone setup up a working version of a...
# ask-community
t
Hello! Has anyone setup up a working version of a scheduler that can build/edit the pipeline before executing it? I have written the functionality required to produce the entire pipeline (solids, solid configs, dependencies), but I'd like to be able to adjust it when the scheduler runs. Let me know 👍
By adjust I mean edit the solid_defs and dependencies.
d
Hi Thomas - the scheduler loads your pipeline code every time that it runs, so I think it already works the way that you’re hoping Edit: Misunderstood what you meant by scheduler
Hi Thomas (take two 🙂 ) - Dagster will recreate your pipeline from code every time it launches a run, which should pick up changes to the solid definitions and dependencies (we don't really have a single component that maps to a 'scheduler' in the Airflow sense - each pipeline run happens in its own process)
t
Maybe here's a better description: I have a function that generates three things: a list of solids, a solids config, and a dependency config. This function queries an external data source to figure out exactly what solids are built. This external data source needs to be re-examined every time the scheduler goes off, so that the pipeline can be "re-generated". An example of what I tried to do (which doesn't work but will illustrate) is to create a pipeline via
PipelineDefinition
with an empty solids list. Then, within the scheduler code, call the function above, and try to replace the empty list with the new list -- thus changing the pipeline solids list right before it's executed. There don't seem to be any functions for editing a
PipelineDefinition
within the scheduler function definition, but that's kind of what I want to do. One solution I thought of just now was to have the solid definitions exist outside the function above, and simply change the run config for the pipeline, but then there's a mismatch between the solids array and the solids config. E.g. the pipeline needs to run solid X 3 times for round one, then scheduler ticks, and now it needs to run solid X 4 times. I can add another entry to the solid config, but the pipeline's original creation with 3 solids would be unchanged, and thus I'd probably get an error. Not sure if this is making sense -- happy to explain more if it's unclear.
I think the idea of pipeline "re-generation" best describes what I'm trying to do -- I want to define a scheduler once, but each time it ticks, it regenerates the pipeline it's supposed to start.
ideally the scheduler function calls my helper function, which returns the required values for a pipeline (solids, mode, dependencies, etc.)
d
Ah, I think I might understand more about what you're asking. Are you hoping to change the dependency/solid structure of the pipeline within a single run of that pipeline?
👍 1
(If so, that's not something we really support currently unfortunately. The closest is this dynamic orchestration feature: https://docs.dagster.io/_apidocs/dynamic#dynamic-mapping-collect-experimental, but that's more specific and constrained than what you're describing here)
t
Got it — final question that perhaps you’ve seen: I’m running the docker deployment you guys have with pipeline, dagit and the daemon. Anyway to get the whole thing to reload on a cadence? Almost restart it every X minutes via dagster settings. I will look into doing this at the docker level as well.
Regardless I appreciate the support!
condagster 1
d
For now that'd have to be done at the docker level with that setup, yeah (although we'd like to add options for more direct management in the future)
t
sorry -- one final question if you have the time: I had a version of things that leveraged
execute_pipeline
within other pipelines / solids, but there was no way to visualize the nested pipelines -- the top level pipeline just ended with the execute command. Is there anyway around this?
d
One pattern that we recommend is using sensors to trigger pipelines based on assets produced in other pipelines: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors - that lets you visualize each of the pipelines, and we're contemplating ways to let you visualize the whole chain of pipelines when you use this feature
t
Thanks!
Hey @daniel quick follow up: if I create an asset with metadata, is there anyway to access this metadata when I grab events with
events_for_asset_key
? I'm hoping to use the asset materialization as a way to pass a value.
Can't find an example in the docs, I only see:
record_id, event = events[0]
in the docs and I can't find a definition of what properties that
event
has or how to set its metadata.
d
Hi Thomas - could you actually make a new post here for this? I’m not actually sure but I’m confident somebody else on the core team will be
t
no problem!