Jenny Webster
06/25/2021, 5:28 PMdagster.core.errors.DagsterInvalidDefinitionError: Input "df" in solid "generate_speed_solid" is not connected to the output of a previous solid and can not be loaded from configuration, creating an impossible to execute pipeline. Possible solutions are:
E * add a dagster_type_loader for the type "DataFrame"
E * connect "df" to the output of another solid
where the “generate_speed_solid” is one of the standard solids inside the composite solid. I have tested the individual solids and they work in a pipeline outside of the composite solid.
We had been using type casting in our function definitions instead of explicitly calling InputDefinition and OutputDefinition in the solid wrappers. I can’t seem to find the documentation about how this needs to change. Can someone point me to the right resource? Thanks!alex
06/25/2021, 5:30 PMJenny Webster
06/25/2021, 5:32 PM@composite_solid(
config_fn=config_path_workflow,
config_schema={
"id_col": Columns.ENTITY_ID.field,
"wkt_col": Columns.POINT_WKT.field,
"time_col": Columns.TIME_LONG.field,
},
)
def generate_basic_path_features_workflow(df: SparkDataFrame) -> SparkDataFrame:
"""
Composite Solid that takes raw dataframes and generates basic speed, bearing and path segmentation features.
Required Columns:
id_col
wkt_col
time_col
Args:
df (SparkDataFrame): Spark DataFrame with a wkt point column containing lon,lat information
Returns:
SparkDataFrame
"""
return generate_path_segments_solid(
df=generate_bearing_solid(df=generate_speed_solid(df=df))
)
alex
06/25/2021, 5:39 PMrun_config
for the pipeline?Jenny Webster
06/25/2021, 6:31 PMalex
06/25/2021, 6:43 PMdagster debug export <run id>
) or sending some more detailsWe had been using type casting in our function definitions instead of explicitly calling InputDefinition and OutputDefinition in the solid wrappers.are you saying that you made some change in addition to migrating from 11.14 -> 11.15?
Jenny Webster
06/25/2021, 6:45 PMalex
06/25/2021, 6:53 PMdagster-pyspark
until recently and are experiencing breaking changes that happened with a past major releaseDataFrame
from your error message is no longer a DagsterType
but just a regular python type, that we don’t how to load from config.
Old versions of the library would globally map the python to the dagster type automatically. The current library needs an explicit make_python_type_usable_as_dagster_type
call to register that mapping, or for the python type hint to be the DataFrame
imported from the dagster-
libraryDan Corbiani
06/25/2021, 6:57 PMJenny Webster
06/25/2021, 6:58 PMDan Corbiani
06/25/2021, 6:58 PMJenny Webster
06/25/2021, 7:01 PMalex
06/25/2021, 7:02 PMWe had been using a custom dagster type in the input definitions for some of our solidsAh ok, so the
InputDefinition
with the DagsterType
was providing the loader
which is how the system knows how to make an instance of that object from config. When you removed those InputDefinitions
you removed that piece, resulting in the observed error.InputDefinition
s with your DagsterType
s , or register your custom dagster type for that python type globally using make_python_type_usable_as_dagster_type
https://docs.dagster.io/concepts/types#patternsJenny Webster
06/25/2021, 11:00 PMalex
06/28/2021, 2:11 PMJenny Webster
06/28/2021, 2:18 PMalex
06/28/2021, 2:23 PM0.11.14
* add a dummy loader to DSparkDataFrame
to workaround the bugJenny Webster
06/28/2021, 2:24 PMalex
06/28/2021, 2:25 PMyuhan
06/28/2021, 11:08 PMesztermarton
06/29/2021, 9:19 AMalex
06/29/2021, 2:32 PMyuhan
06/29/2021, 4:59 PMJenny Webster
06/29/2021, 5:03 PM