Andrey Alekseev
08/28/2020, 6:20 PM@pipeline
def my_pipeline():
df = load_data_from_dwh.alias("load_target_data")() # don't need to run
df = data_preparation(df)
result = ml_stuff(df)
df -- simple pandas dataframe.
I have simple specified dagster pandas type and data_preparation accepts it:
PandasDataFrame = create_dagster_pandas_dataframe_type(
name="PandasDataFrame")
load_data_from_dwh -- uses custom sqlalchemy code, so i don't have any resources.
Tried to do this:
def test_pipeline_run(dataframe):
run_config = {
"solids": {
"divide_data_over_column": {
'inputs': {
"df": dataframe
},
"config": {...},
},
}
}
solid_selection = ["data_preparation", "ml_stuff"]
execute_pipeline(my_pipeline,
run_config=run_config,
solid_selection=solid_selection)
An it results in_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/execution/api.py:298: in execute_pipeline
return _logged_execute_pipeline(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/telemetry.py:89: in wrap
result = f(*args, **kwargs)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/execution/api.py:342: in _logged_execute_pipeline
pipeline_run = instance.create_run_for_pipeline(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/instance/__init__.py:518: in create_run_for_pipeline
execution_plan = create_execution_plan(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/execution/api.py:565: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/system_config/objects.py:96: in build
config_evr = process_config(environment_type, run_config)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:333: in process_config
validate_evr = validate_config(config_type, config_dict)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:58: in validate_config
return validate_config_from_snap(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:68: in validate_config_from_snap
return _validate_config(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:101: in _validate_config
return validate_selector_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:155: in validate_selector_config
if config_value == {}:
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/pandas/core/ops/__init__.py:833: in f
other = _align_method_FRAME(self, other, axis=None)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/pandas/core/ops/__init__.py:676: in _align_method_FRAME
right = to_series(right)
right = {}
def to_series(right):
msg = "Unable to coerce to Series, length must be {req_len}: given {given_len}"
if axis is not None and left._get_axis_name(axis) == "index":
if len(left.index) != len(right):
raise ValueError(
msg.format(req_len=len(left.index), given_len=len(right))
)
right = left._constructor_sliced(right, index=left.index)
else:
if len(left.columns) != len(right):
> raise ValueError(
msg.format(req_len=len(left.columns), given_len=len(right))
)
E ValueError: Unable to coerce to Series, length must be 3: given 0
def data_preparation(context, df: PandasDataFrame)
to
def data_preparation(context, df: Any)
Leor
08/28/2020, 6:29 PMAndrey Alekseev
08/28/2020, 6:33 PMimport pandas as pd
import pytest
from dagster import execute_solid
from src.solids.features.data_preparation import data_preparation
@pytest.fixture
def create_dataframe():
df = pd.DataFrame.from_dict(
{"date": pd.date_range(start="2020-01-01", end="2020-02-01")}
)
df["value"] = 42
df["segment"] = "Moscow"
return df
@pytest.mark.smoke
def test_solid_run(create_dataframe):
input_values = {"df": create_dataframe}
run_config = {
"solids": {
"data_preparation": {
"config": {
"target_column": "value",
"divide_by": "segment",
"date_column": "date",
}
}
}
}
res = execute_solid(
data_preparation,
input_values=input_values,
run_config=run_config,
)
assert res.success
Also it works as full pipeline (when it loads data from DB and sends it to data_preparation solid)Leor
08/28/2020, 6:36 PMAndrey Alekseev
08/28/2020, 6:55 PMLeor
08/28/2020, 8:32 PMsandy
08/28/2020, 11:28 PM'inputs': {
"df": dataframe
},
we would like to make this cleaner to express, but I believe the best way to make this happen right now would be with a custom dagster type loader:
from dagster_pandas.data_frame import dataframe_loader
@dagster_type_loader(config_schema=Selector({'mock': {}})
def mock_df_loader(_context, config):
if list(config.items())[0][0] == 'mock':
df = pd.DataFrame.from_dict(
{"date": pd.date_range(start="2020-01-01", end="2020-02-01")}
)
df["value"] = 42
df["segment"] = "Moscow"
return df
PandasDataFrame = create_dagster_pandas_dataframe_type(
name="PandasDataFrame", loader=mock_df_loader)
then you would use the following as run config in your test:
run_config = {
"solids": {
"divide_data_over_column": {
'inputs': {
"df": 'mock'
},
"config": {...},
},
}
}
Andrey Alekseev
08/29/2020, 1:24 PM