aqm
02/09/2020, 9:11 AMschrockn
02/09/2020, 2:45 PMbuild_sub_pipeline
on the original pipeline and the pass that to execute_pipeline. e.g.:
execute_pipeline(original_pipeline.build_sub_pipeline(['solid_a', 'solid_b']))
aqm
02/09/2020, 9:25 PM@pipeline
def pipeline1():
df1 = solid1()
df2 = solid2(df1)
df3 = solid3(df2)
I've executed this pipeline and stored the PipelineExecutionResult in result
result = execute_pipeline(
pipeline1,
environment_dict={
'solids': {
'solid1': {'inputs': {'arg1': ??}},
'solid2': {'inputs': {'arg1': ??}}
'solid3': {'inputs': {'arg1': ??}}
}},
)
I want to just execute solid2 and solid3-- ideally returning a PipelineExecutionResult-- and re-use the results of solid1 from the previous run.
I tired the below, but this yields a 'Truth value of a dataframe is ambiguous'
subresult = execute_pipeline(
pipeline1.build_sub_pipeline(['solid2','solid3]),
environment_dict={
'solids': {
'solid2': {'inputs': {'df1': res.result_for_solid('solid1').output_values['result']}}
}},
run_config=RunConfig(run_id=result.run_id)
)
thoughts?
a little more about the use case: my team is exploring using dagster as a way to iteratively build up a DAG of computations in an ML training pipeline. The pipeline function captures the whole pipeline, and a developer wants to iterate on the contents of solid2 and solid 3 without re-running solid1 (which is usually an expensive operation that grabs a large dataset from a datawarehouse.
From the docs it looks like this use case is easily supported through dagit, but perhaps less clear through the python API.
Thanks for the guidanceVincent Goffin
02/10/2020, 10:50 AMdf.loc[df == True & df['other_column'] == False]
?
import pandas as pd
df = pd.DataFrame({'column': [True, False, False, True], 'other_column': [False, False, True, True]})
df.loc[df == True & df['other_column'] == False]
throws the same error.alex
02/10/2020, 3:51 PMTruth value of a dataframe is ambiguousya its not obvious to me from what you’ve provided as to what could be going wrong in dagster to cause this pandas error.
aqm
02/10/2020, 6:14 PMalex
02/10/2020, 6:16 PMres.result_for_solid('solid1').output_values['result']
makes sense as a string
since environment_dict
is a serialized thingaqm
02/10/2020, 6:23 PMalex
02/10/2020, 6:26 PMby passing solid1's dataframe output to the environment_dict of the sub_pipeline’s executionthe environment_dict is effectively a big string input - typically driven from a
yaml
file or editor. So it would probably make more sense to pass the location of a file containing the dataframe as opposed to a big serialized version of itaqm
02/10/2020, 6:29 PMalex
02/10/2020, 6:30 PMinput_hydration_config
of a DagsterType
defines how it can load that type given config. Our DataFrame
type in dagster-pandas
defines loading from different file formats (csv, parquet, table)'solids': {
'solid2': {'inputs': {'df1': {'csv': 'path/to/df.csv'}}}
}}
aqm
02/10/2020, 8:58 PMalex
02/10/2020, 9:00 PM@lru_cache
or similarRunConfig
subresult = execute_pipeline(
pipeline1,
run_config=RunConfig(
previous_run_id=result.run_id , # copy over intermediates from previous run
step_keys_to_execute=['solid2.compute', 'solid3.compute'], # select what to re-execute
)
)
aqm
02/11/2020, 4:53 AMDagsterInvariantViolationError: Cannot perform reexecution with non persistent intermediates manager `InMemoryIntermediatesManager`.
If i instantiate a DagsterInstance.Ephemeral() and execute a pipeline with `storage`: in_memory
in the environment dict, is there a way to retrieve the intermediate outputs of the pipeline from the instance?alex
02/11/2020, 3:45 PMstorage: filesystem
- we didn’t design support for sequential pipeline executions in the same process.
DagsterInstance.Ephemeral()
will use a temp directory that gets cleaned up when the process closes. You can set DAGSTER_HOME
environment variable to a directory and use DagsterInstance.get()
to keep all your metadata around in said directory. https://dagster.readthedocs.io/en/0.6.9/sections/deploying/reference.html#the-dagsterinstanceaqm
02/11/2020, 6:07 PM