Hi all. I'm trying to understand how, through the...
Hi all. I'm trying to understand how, through the python api, I can execute a subset of pipeline solids. I can use execute_solids_within_pipeline(), but this returns a list of SolidExecutionResults. Ideally, what I want is a PipelineExecutionResult, which has a run_id and a context I can inspect. Is there a way to do this?
Hey aqm this is definitely way more awkward and less documented than it should be right now. For now call
on the original pipeline and the pass that to execute_pipeline. e.g.:
execute_pipeline(original_pipeline.build_sub_pipeline(['solid_a', 'solid_b']))
got it -thanks for the response. I didn't know about build_subpipeline. I'm still having issues achieving the desired result. Here's an extended example to illustrate the issues I'm running into: Say I have a simple pipeline like this
def pipeline1():
    df1 = solid1()
    df2 = solid2(df1)
    df3 = solid3(df2)
I've executed this pipeline and stored the PipelineExecutionResult in
result = execute_pipeline(
    'solids': {
        'solid1': {'inputs': {'arg1': ??}},
        'solid2': {'inputs': {'arg1': ??}}
        'solid3': {'inputs': {'arg1': ??}}
I want to just execute solid2 and solid3-- ideally returning a PipelineExecutionResult-- and re-use the results of solid1 from the previous run. I tired the below, but this yields a 'Truth value of a dataframe is ambiguous'
subresult = execute_pipeline(
    'solids': {
        'solid2': {'inputs': {'df1': res.result_for_solid('solid1').output_values['result']}}
thoughts? a little more about the use case: my team is exploring using dagster as a way to iteratively build up a DAG of computations in an ML training pipeline. The pipeline function captures the whole pipeline, and a developer wants to iterate on the contents of solid2 and solid 3 without re-running solid1 (which is usually an expensive operation that grabs a large dataset from a datawarehouse. From the docs it looks like this use case is easily supported through dagit, but perhaps less clear through the python API. Thanks for the guidance
Hi aqm, this might be a pandas error, any chance you're trying to filter a dataframe in one of those solids? Something like
df.loc[df == True & df['other_column'] == False]
import pandas as pd
df = pd.DataFrame({'column': [True, False, False, True], 'other_column': [False, False, True, True]})
df.loc[df == True & df['other_column'] == False]
throws the same error.
Truth value of a dataframe is ambiguous
ya its not obvious to me from what you’ve provided as to what could be going wrong in dagster to cause this pandas error.
that would be embarassing, but i'll check!
it may still be an unexpected dagster behavior - but you’ll have to discern what exactly is going awry for us to figure that out
one thing is to make sure that the output of
makes sense as a
is a serialized thing
got it - thanks for the rapid feedback here. I'll do some debugging. Generally speaking, am I approaching this correctly by passing solid1's dataframe output to the environment_dict of the sub_pipeline's execution
are you using a custom dataframe type or ours from dagster-pandas?
by passing solid1's dataframe output to the environment_dict of the sub_pipeline’s execution
the environment_dict is effectively a big string input - typically driven from a
file or editor. So it would probably make more sense to pass the location of a file containing the dataframe as opposed to a big serialized version of it
i was using plain vanilla pandas. haven't checked dagster-pandas but can use that (will look into it).
of a
defines how it can load that type given config. Our
type in
defines loading from different file formats (csv, parquet, table)
so you would have something like
'solids': {
        'solid2': {'inputs': {'df1': {'csv': 'path/to/df.csv'}}}
got it - is it possible to have 'df1' point to an in-memory object instead of something saved to disk?
no - not using the environment / config system anyway
you could have a solid return an object in memory
if you are executing these pipelines one after the other in the same process - you could just use memoization in the solid - using
or similar
hm - another thing you could try is using the re-execution options on
subresult = execute_pipeline(
      previous_run_id=result.run_id , # copy over intermediates from previous run
      step_keys_to_execute=['solid2.compute', 'solid3.compute'],  # select what to re-execute
this is super helpful. it seems to do exactly what I want when writing intermediates to the filesystem. switching the storage to 'in_memory` in line 638 makes the subresult raise an error
DagsterInvariantViolationError: Cannot perform reexecution with non persistent intermediates manager `InMemoryIntermediatesManager`.
If i instantiate a DagsterInstance.Ephemeral() and execute a pipeline with `storage`:
in the environment dict, is there a way to retrieve the intermediate outputs of the pipeline from the instance?
you need to use
storage: filesystem
- we didn’t design support for sequential pipeline executions in the same process.
will use a temp directory that gets cleaned up when the process closes. You can set
environment variable to a directory and use
to keep all your metadata around in said directory. https://dagster.readthedocs.io/en/0.6.9/sections/deploying/reference.html#the-dagsterinstance
👍 thanks