I was wondering how you can have DAGIT display the...
# announcements
j
I was wondering how you can have DAGIT display the execution history of a pipeline that "executes" an array of other pipelines. If I go to the playground and "launch execute" on my parent "planning_pipeline" an array of subsequent pipelines won't show in DAGITs run menu, only the planning pipeline. def do_planning_pipeline(): # this for loop runs an array of pipelines for about 100 items that are dynamically created pipelines=create_item_pipelines() for pipe in pipelines: execute_pipeline(pipe)
a
you need to set the
instance
arg on the
execute_pipeline
call otherwise it will use an “ephemeral” instance. If this is in a solid you have it on
context.instance
j
OK, thanks
@alex It tried passing a context from a solid in the parent pipeline, to the child pipelines. Didn't seem to have an effect. The child pipelines run. They are just not seen by DAGSTER.
@alex Here's what it looks like: def do_planning_pipeline(): context=get_context_from_solid() # this for loop runs an array of pipelines for about 100 items that are dynamically created pipelines=create_item_pipelines() for pipe in pipelines: execute_pipeline(pipe, context)
@alex I used a local dagster.yaml: run_storage: module: dagster.core.storage.runs class: SqliteRunStorage config: base_dir: /opt/dagster/db local_artifact_storage: module: dagster.core.storage.root class: LocalArtifactStorage config: base_dir: /opt/dagster/db event_log_storage: module: dagster.core.storage.event_log class: SqliteEventLogStorage config: base_dir: /opt/dagster/db
a
execute_pipeline(pipe, context)
how approximate is this code?
instance
is the 7th arg so would expect pass by keyword, also what is the impl of get_context_from_solid()?
j
Too generalized I guess: context=get_context() for pipe in pipelines: execute_pipeline(pipe, instance=context.instance) the solid: @solid def get_context(context): return context
a
im quite confused by
get_context
being its own solid, how is
do_planning_pipeline
invoked?
maybe just try
instance=DagsterInstance.get()
my hypothesis is that you are still getting the value
None
for
instance
since the pipelines are not showing up in
dagit
is
do_planning_pipeline
a
@pipeline
decorated function ?
@pipeline
functions are meant to build up a DAG, they are evaluated at init time and the invocations of
@solid
functions are tracked. The bodies of the
@solid
functions are then the only thing executed at “run” time when the pipeline is launched
j
Do_planning_pipeline is just a function wrapped by this factory: def planning_pipeline( arg, name="default_name", input_defs=None, **kwargs, ): @pipeline(name=name, tags={'owner': 'plan1', 'source': 'plan1'}) def _x_planning_pipeline(): # Pipeline logic here do_planning_pipeline() pass return _x_planning_pipeline
I figured if I execute that dynamically created pipeline, it would then execute an array of other pipelines.