Hi all. I'm trying to understand how, through the...
# announcements
a
Hi all. I'm trying to understand how, through the python api, I can execute a subset of pipeline solids. I can use execute_solids_within_pipeline(), but this returns a list of SolidExecutionResults. Ideally, what I want is a PipelineExecutionResult, which has a run_id and a context I can inspect. Is there a way to do this?
s
Hey aqm this is definitely way more awkward and less documented than it should be right now. For now call
build_sub_pipeline
on the original pipeline and the pass that to execute_pipeline. e.g.:
Copy code
execute_pipeline(original_pipeline.build_sub_pipeline(['solid_a', 'solid_b']))
a
got it -thanks for the response. I didn't know about build_subpipeline. I'm still having issues achieving the desired result. Here's an extended example to illustrate the issues I'm running into: Say I have a simple pipeline like this
Copy code
@pipeline
def pipeline1():
    df1 = solid1()
    df2 = solid2(df1)
    df3 = solid3(df2)
I've executed this pipeline and stored the PipelineExecutionResult in
result
Copy code
result = execute_pipeline(
    pipeline1,
    environment_dict={
    'solids': {
        'solid1': {'inputs': {'arg1': ??}},
        'solid2': {'inputs': {'arg1': ??}}
        'solid3': {'inputs': {'arg1': ??}}
    }},
)
I want to just execute solid2 and solid3-- ideally returning a PipelineExecutionResult-- and re-use the results of solid1 from the previous run. I tired the below, but this yields a 'Truth value of a dataframe is ambiguous'
Copy code
subresult = execute_pipeline(
    pipeline1.build_sub_pipeline(['solid2','solid3]),
    environment_dict={
    'solids': {
        'solid2': {'inputs': {'df1': res.result_for_solid('solid1').output_values['result']}}
    }},
    run_config=RunConfig(run_id=result.run_id)
)
thoughts? a little more about the use case: my team is exploring using dagster as a way to iteratively build up a DAG of computations in an ML training pipeline. The pipeline function captures the whole pipeline, and a developer wants to iterate on the contents of solid2 and solid 3 without re-running solid1 (which is usually an expensive operation that grabs a large dataset from a datawarehouse. From the docs it looks like this use case is easily supported through dagit, but perhaps less clear through the python API. Thanks for the guidance
v
Hi aqm, this might be a pandas error, any chance you're trying to filter a dataframe in one of those solids? Something like
df.loc[df == True & df['other_column'] == False]
?
import pandas as pd
df = pd.DataFrame({'column': [True, False, False, True], 'other_column': [False, False, True, True]})
df.loc[df == True & df['other_column'] == False]
throws the same error.
👍 1
a
Truth value of a dataframe is ambiguous
ya its not obvious to me from what you’ve provided as to what could be going wrong in dagster to cause this pandas error.
a
that would be embarassing, but i'll check!
a
it may still be an unexpected dagster behavior - but you’ll have to discern what exactly is going awry for us to figure that out
one thing is to make sure that the output of
res.result_for_solid('solid1').output_values['result']
makes sense as a
string
since
environment_dict
is a serialized thing
a
got it - thanks for the rapid feedback here. I'll do some debugging. Generally speaking, am I approaching this correctly by passing solid1's dataframe output to the environment_dict of the sub_pipeline's execution
a
are you using a custom dataframe type or ours from dagster-pandas?
by passing solid1's dataframe output to the environment_dict of the sub_pipeline’s execution
the environment_dict is effectively a big string input - typically driven from a
yaml
file or editor. So it would probably make more sense to pass the location of a file containing the dataframe as opposed to a big serialized version of it
a
i was using plain vanilla pandas. haven't checked dagster-pandas but can use that (will look into it).
a
the
input_hydration_config
of a
DagsterType
defines how it can load that type given config. Our
DataFrame
type in
dagster-pandas
defines loading from different file formats (csv, parquet, table)
so you would have something like
Copy code
'solids': {
        'solid2': {'inputs': {'df1': {'csv': 'path/to/df.csv'}}}
    }}
a
got it - is it possible to have 'df1' point to an in-memory object instead of something saved to disk?
a
no - not using the environment / config system anyway
you could have a solid return an object in memory
if you are executing these pipelines one after the other in the same process - you could just use memoization in the solid - using
@lru_cache
or similar
hm - another thing you could try is using the re-execution options on
RunConfig
Copy code
subresult = execute_pipeline(
    pipeline1,
    run_config=RunConfig(
      previous_run_id=result.run_id , # copy over intermediates from previous run
      step_keys_to_execute=['solid2.compute', 'solid3.compute'],  # select what to re-execute
    )
)
a
this is super helpful. it seems to do exactly what I want when writing intermediates to the filesystem. switching the storage to 'in_memory` in line 638 makes the subresult raise an error
Copy code
DagsterInvariantViolationError: Cannot perform reexecution with non persistent intermediates manager `InMemoryIntermediatesManager`.
If i instantiate a DagsterInstance.Ephemeral() and execute a pipeline with `storage`:
in_memory
in the environment dict, is there a way to retrieve the intermediate outputs of the pipeline from the instance?
a
you need to use
storage: filesystem
- we didn’t design support for sequential pipeline executions in the same process.
DagsterInstance.Ephemeral()
will use a temp directory that gets cleaned up when the process closes. You can set
DAGSTER_HOME
environment variable to a directory and use
DagsterInstance.get()
to keep all your metadata around in said directory. https://dagster.readthedocs.io/en/0.6.9/sections/deploying/reference.html#the-dagsterinstance
a
👍 thanks