Hi! Could you please help me to figure out how to ...
# announcements
a
Hi! Could you please help me to figure out how to test pipeline if it uses some external resources? Are there proper ways to so? Or the only way is to set up an environment (DB, etc) and write special test mode for the pipeline? I tried to use execute_pipeline and solid_selection. But I cannot figure out how to pass inputs in solids in the middle. code in the thread
Copy code
@pipeline
def my_pipeline():
    df = load_data_from_dwh.alias("load_target_data")() # don't need to run
    df = data_preparation(df)
    result = ml_stuff(df)
df -- simple pandas dataframe. I have simple specified dagster pandas type and data_preparation accepts it:
Copy code
PandasDataFrame = create_dagster_pandas_dataframe_type(
    name="PandasDataFrame")
load_data_from_dwh -- uses custom sqlalchemy code, so i don't have any resources. Tried to do this:
Copy code
def test_pipeline_run(dataframe):
    run_config = {
        "solids": {
            "divide_data_over_column": {
                'inputs': {
                    "df": dataframe
                },
                "config": {...},
            },
        }
    }
    solid_selection = ["data_preparation", "ml_stuff"]
    execute_pipeline(my_pipeline,
                     run_config=run_config,
                     solid_selection=solid_selection)
An it results in
Copy code
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/execution/api.py:298: in execute_pipeline
    return _logged_execute_pipeline(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/telemetry.py:89: in wrap
    result = f(*args, **kwargs)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/execution/api.py:342: in _logged_execute_pipeline
    pipeline_run = instance.create_run_for_pipeline(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/instance/__init__.py:518: in create_run_for_pipeline
    execution_plan = create_execution_plan(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/execution/api.py:565: in create_execution_plan
    environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/core/system_config/objects.py:96: in build
    config_evr = process_config(environment_type, run_config)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:333: in process_config
    validate_evr = validate_config(config_type, config_dict)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:58: in validate_config
    return validate_config_from_snap(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:68: in validate_config_from_snap
    return _validate_config(
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
    return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
    return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
    field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
    return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
    return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
    field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
    return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
    return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
    field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:103: in _validate_config
    return validate_shape_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:261: in validate_shape_config
    return _validate_shape_config(context, config_value, check_for_extra_incoming_fields=True)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:234: in _validate_shape_config
    field_evr = _validate_config(context.for_field_snap(field_snap), config_value[name])
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:101: in _validate_config
    return validate_selector_config(context, config_value)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/dagster/config/validate.py:155: in validate_selector_config
    if config_value == {}:
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/pandas/core/ops/__init__.py:833: in f
    other = _align_method_FRAME(self, other, axis=None)
../../../../miniconda3/envs/etna/lib/python3.8/site-packages/pandas/core/ops/__init__.py:676: in _align_method_FRAME
    right = to_series(right)

right = {}

    def to_series(right):
        msg = "Unable to coerce to Series, length must be {req_len}: given {given_len}"
        if axis is not None and left._get_axis_name(axis) == "index":
            if len(left.index) != len(right):
                raise ValueError(
                    msg.format(req_len=len(left.index), given_len=len(right))
                )
            right = left._constructor_sliced(right, index=left.index)
        else:
            if len(left.columns) != len(right):
>               raise ValueError(
                    msg.format(req_len=len(left.columns), given_len=len(right))
                )
E               ValueError: Unable to coerce to Series, length must be 3: given 0
Same error when I change
Copy code
def data_preparation(context, df: PandasDataFrame)
to
Copy code
def data_preparation(context, df: Any)
l
hmmm, that seems like an issue with your dataframe, not dagster itself?
can you confirm that the code works outside the dagster abstraction?
a
Yep. I have test for single solid and it works. Here is the full code:
Copy code
import pandas as pd
import pytest
from dagster import execute_solid

from src.solids.features.data_preparation import data_preparation


@pytest.fixture
def create_dataframe():
    df = pd.DataFrame.from_dict(
        {"date": pd.date_range(start="2020-01-01", end="2020-02-01")}
    )
    df["value"] = 42
    df["segment"] = "Moscow"
    return df


@pytest.mark.smoke
def test_solid_run(create_dataframe):
    input_values = {"df": create_dataframe}
    run_config = {
        "solids": {
            "data_preparation": {
                "config": {
                    "target_column": "value",
                    "divide_by": "segment",
                    "date_column": "date",
                }
            }
        }
    }
    res = execute_solid(
        data_preparation,
        input_values=input_values,
        run_config=run_config,
    )
    assert res.success
Also it works as full pipeline (when it loads data from DB and sends it to data_preparation solid)
l
aight
a
Maybe it is probably better just to use modes and resources and test the the whole pipeline?
l
Yes, probably
s
hey @Andrey Alekseev - this is a very sensible thing to want to do. the issue you're running into is because it's not possible to pass pandas dataframes through dagster config - i.e. dagster doesn't know what to do with
Copy code
'inputs': {
                    "df": dataframe
                },
we would like to make this cleaner to express, but I believe the best way to make this happen right now would be with a custom dagster type loader:
Copy code
from dagster_pandas.data_frame import dataframe_loader

@dagster_type_loader(config_schema=Selector({'mock': {}})
def mock_df_loader(_context, config):
    if list(config.items())[0][0] == 'mock':
        df = pd.DataFrame.from_dict(
            {"date": pd.date_range(start="2020-01-01", end="2020-02-01")}

        )
        df["value"] = 42
        df["segment"] = "Moscow"
        return df

PandasDataFrame = create_dagster_pandas_dataframe_type(
    name="PandasDataFrame", loader=mock_df_loader)
then you would use the following as run config in your test:
Copy code
run_config = {
        "solids": {
            "divide_data_over_column": {
                'inputs': {
                    "df": 'mock'
                },
                "config": {...},
            },
        }
    }
a
Thanks. Will try that)