Amanda Crawford
08/21/2019, 9:12 PMSelector
. Because this solid is an internal to the dag (non a start node) I am not able to (or just don't know how to) pass in the solid config because the value is outputted by the previous solid. I have downloaded a file from s3 and the output is file path string to a csv file. Once I have this, I would then like to use the example sum_solid
solid used in the airline_demo code that takes in this file path and using the dagster_pandas.Dataframe type. I was looking at the source code and see the Dataframe type uses a Selector
to determine how to handle the data. Is there a way to properly do this?Hanwei
08/22/2019, 9:22 AM@lambda_solid
vs a @solid
Kevin
08/22/2019, 10:45 AMThis shortcut allows the creation of simple solids that do not require configuration and whose implementations do not require a context.So tl;dr: if you don't need a context or config fields,
lambda_solid
it isHanwei
08/22/2019, 11:35 AMHanwei
08/22/2019, 11:47 AMretry
semantics in Dagster? I can't find anything in the docs referencing that. Is it down to the author of the solid to code that?user
08/22/2019, 6:45 PMMatt Coleman
08/22/2019, 9:34 PMNoah Trueblood
08/22/2019, 9:39 PM@solid(output_defs=[OutputDefinition(name='df',dagster_type=DataFrame)])
def get_df(context):
yield OutputDefinition(pd.DataFrame([1, 2, 3]), 'df')
The following error occurs:```
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/errors.py", line 104, in user_code_error_boundary
yield
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/engine/engine_inprocess.py", line 568, in _user_event_sequence_for_step_compute_fn
for event in gen:
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 75, in _execute_core_compute
for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 52, in _yield_compute_results
for event in user_event_sequence:
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/definitions/decorators.py", line 343, in compute
for item in result:
File "/Users/noahtrueblood/dappr/data/transform/derive.py", line 57, in get_df
yield OutputDefinition(pd.DataFrame([1, 2, 3]), 'df')
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/definitions/output.py", line 31, in init
self._runtime_type = check.inst(resolve_to_runtime_type(dagster_type), RuntimeType)
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/types/runtime.py", line 538, in resolve_to_runtime_type
dagster_type = remap_python_type(dagster_type)
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/dagster/core/types/mapping.py", line 13, in remap_python_type
if type_annotation == int:
File "/Users/noahtrueblood/.local/share/virtualenvs/dappr-Fw8AAVno/lib/python3.7/site-packages/pandas/core/generic.py", line 1478, in nonzero
.format(self.class.name))
I see in `dagster/python_modules/dagster/dagster/core/types/mapping.py` that an equality is performed. For example: `type_annotation == int`. Which makes me think that type_annotation is of type pd.DataFrame instead of type DataFrame. Any ideas?
Interestingly, the error does not occur when I use a return instead of a yield:
@solid(output_defs=[OutputDefinition(name='df',dagster_type=DataFrame)])
def get_df(context):
return pd.DataFrame([1, 2, 3])
```dwall
08/23/2019, 8:55 PMMatt Coleman
08/27/2019, 1:08 PMsolid_a
outputs a list of n
files, and we want to create n
aliases of solid_b each with a file as input, can we do this?Amanda Crawford
08/28/2019, 4:42 PMachintamiri
09/02/2019, 4:33 PMachintamiri
09/02/2019, 4:34 PMachintamiri
09/02/2019, 4:57 PMachintamiri
09/03/2019, 3:39 PMDataFrame = as_dagster_type(
pd.DataFrame,
name='PandasDataFrame',
description='''Two-dimensional size-mutable, potentially heterogeneous
tabular data structure with labeled axes (rows and columns).
See <http://pandas.pydata.org/>''',
)
@lambda_solid
def Input1(pdd: DataFrame) -> DataFrame:
r = pdd.read_csv('file1.csv')
return r
@lambda_solid
def Merge(r: DataFrame,r2: DataFrame,pdd: DataFrame) -> DataFrame:
r3=pdd.concat([r,r2], axis=1)
return
@lambda_solid
def Input2(pdd: DataFrame) -> DataFrame:
r2 = pdd.read_csv('file2.csv')
return r2
@lambda_solid
def Result_output(y: DataFrame) -> DataFrame:
y3=y
return
@pipeline
def actual_dag_pipeline() :
y=Merge(Input1(),Input2())
Result_output(y)
achintamiri
09/05/2019, 12:08 PMachintamiri
09/05/2019, 12:08 PMachintamiri
09/05/2019, 12:10 PMachintamiri
09/05/2019, 12:11 PMimport collections
import dagstermill as dm
from dagster import pipeline, execute_pipeline, lambda_solid
from dagster import PipelineDefinition,InputDefinition, OutputDefinition, Int
from dagster import Any, Field, lambda_solid, solid, pipeline, as_dagster_type
from dagster import InputDefinition, OutputDefinition, Int
import dagstermill as dm
my_notebook_solid = dm.define_dagstermill_solid(
name='DM1',
notebook_path='DM1.ipynb',
input_defs = [
InputDefinition(name='a'),
InputDefinition(name='b')
],
output_defs = [OutputDefinition()]
)
#def notebook_pipeline():
# return PipelineDefinition(name='julia_pipeline', solid_defs=[my_notebook_solid])
def notebook_pipeline():
return PipelineDefinition(name='pipeline', solid_defs=[my_notebook_solid])
achintamiri
09/05/2019, 12:16 PMipynb file
import dagstermill as dm
from dagstermill.examples.repository import define_example_repository
a = 1
b = 2
result= a+b
dm.yield_result(result)
achintamiri
09/06/2019, 10:56 AMnotebook_solid = dm.define_dagstermill_solid(
name='hello_world1',
notebook_path='hello_world1.ipynb')
def notebook_pipeline():
return PipelineDefinition(name='pipeline', solid_defs=[notebook_solid])
achintamiri
09/06/2019, 10:57 AMachintamiri
09/06/2019, 10:58 AMTobias Macey
09/06/2019, 2:09 PMMarwan
09/10/2019, 2:30 PMconfig_cls
param in the output_materialization_config
decorator?Marwan
09/10/2019, 3:06 PMuser
09/10/2019, 5:40 PMachintamiri
09/11/2019, 2:17 PMachintamiri
09/16/2019, 10:45 AM#file.py
import julia
j = julia.Julia()
x = j.include("test.jl")
achintamiri
09/16/2019, 3:00 PM