danil
02/07/2021, 12:28 AMcomposite_solid
abstraction?
I am getting UserWarning: Error loading repository location local_repo.py:dagster.core.errors.DagsterUserCodeProcessError: AttributeError: 'InputMappingNode' object has no attribute 'startswith'
when passing a value to composite_solid
that will then based on that value will determine which solid
to execute. Code is attached in the thread to spare the channel.danil
02/07/2021, 12:29 AM@composite_solid
def read_csv_as_df(path: str) -> constants.PandasDataFrame:
if(path.startswith("s3://")):
return read_csv_as_df_s3(s3_path = path)
else:
return read_csv_as_df_locally(local_path = path)
@solid
def read_csv_as_df_locally(context, local_path: str) -> constants.PandasDataFrame:
<http://context.log.info|context.log.info>(f"Reading csv data locally from {local_path}")
return pd.read_csv(local_path)
@composite_solid
def read_csv_as_df_s3(s3_path: str) -> constants.PandasDataFrame:
bucket, key = utils.parse_s3_path(s3_path)
csv_file = get_from_s3(bucket=bucket, key=key)
return pd.read_csv(csv_file)
"""
Get any file from S3 given proper bucket, key
"""
@solid(required_resource_keys={'s3'})
def get_from_s3(context, bucket: str, key: str):
<http://context.log.info|context.log.info>(f"Attempting to get file from S3 from {bucket}/{key}")
return context.resource.s3.get_object(Bucket=bucket, Key=key)
danil
02/07/2021, 12:30 AM@pipeline(
name="BabyPipeline",
preset_defs=[
PresetDefinition(
name="local",
run_config=configs.local_config_raw,
tags= {"dataset": "/Users/danil/Desktop/dagster-pipelines/dagster_baby_pipeline/data/forestfires.csv"}
),
PresetDefinition(
name="prod",
run_config=configs.prod_config_raw,
tags= {"dataset": "<s3://fable.data/baby_pipeline/forestfires.csv>"}
)
]
)
def pipeline():
df_input = io_solids.read_csv_as_df()
df_processed = logic.transform_data(df_input)
io_solids.save_df(df_processed)
###
local_config_raw = {
"solids": {
"read_csv_as_df": {
"inputs": {"path": {"value": "/Users/danil/Desktop/dagster-pipelines/dagster_baby_pipeline/data/forestfires.csv"}}
},
"save_df": {
"inputs": {"path": {"value": "/Users/danil/Desktop/dagster-pipelines/dagster_baby_pipeline/data/processsed_df.csv"}}
}
}
}
prod_config_raw = {
"solids": {
"read_csv_as_df": {
"inputs": {"path": {"value": "<s3://fable.data/baby_pipeline/forestfires.csv>"}}
},
"save_df": {
"inputs": {"path": {"value": "<s3://fable.data/baby_pipeline/processed_df.csv>"}}
}
}
}
danil
02/07/2021, 12:36 AMlocal, prod
) the dataset path is different, if the preset is local then the path is local, if preset is prod then the path leads to S3. Based on this input the read_csv_as_df
Composite Solid will decide which solid to use either read_csv_as_df_locally
or read_csv_as_df_s3
, when trying to load this via Dagit
I am getting the following error:
dagster/cli/workspace/workspace.py:50: UserWarning: Error loading repository location local_repo.py:dagster.core.errors.DagsterUserCodeProcessError: AttributeError: 'InputMappingNode' object has no attribute 'startswith'
At _if_(path.startswith("s3://")):
over here:
@composite_solid
def read_csv_as_df(path: str) -> constants.PandasDataFrame:
if(path.startswith("s3://")):
danil
02/07/2021, 12:37 AMNoah K
02/07/2021, 12:45 AMNoah K
02/07/2021, 12:45 AMschrockn
02/07/2021, 12:45 AMinput_defs=[InputDefinition("path", str]
(We are likely going to discourage the practice of using type annotations that violate mypy typing fwiw)danil
02/07/2021, 12:45 AMschrockn
02/07/2021, 12:45 AMschrockn
02/07/2021, 12:46 AMpipeline
Noah K
02/07/2021, 12:47 AMNoah K
02/07/2021, 12:47 AMif
🙂Noah K
02/07/2021, 12:48 AMNoah K
02/07/2021, 12:48 AMNoah K
02/07/2021, 12:49 AMdanil
02/07/2021, 12:49 AMNoah K
02/07/2021, 12:50 AMs3_url, local_path = is_it_s3_or_local(path)
download_s3(s3_url)
download_local(local_path)
schrockn
02/07/2021, 12:51 AMschrockn
02/07/2021, 12:51 AMNoah K
02/07/2021, 12:56 AMdanil
02/07/2021, 12:58 AMlocal
preset, is_it_s3_or_local()
would return None
for s3_url
and download_s3(s3_url)
would check for None
and also return None
. Am I reading your intention correctly? So download_s3()
would execute for no reason technically.Noah K
02/07/2021, 12:58 AMNoah K
02/07/2021, 12:59 AMreturn foo
at the end of a solid is just a shortcut for yield Output(name="foo", value=foo)
more or lessNoah K
02/07/2021, 12:59 AMNoah K
02/07/2021, 12:59 AMNoah K
02/07/2021, 12:59 AMNoah K
02/07/2021, 1:00 AMNoah K
02/07/2021, 1:00 AMNoah K
02/07/2021, 1:00 AMschrockn
02/07/2021, 1:01 AMpipeline
and composite_solid
are not invoking normal python functions, they are constructing a graph.Noah K
02/07/2021, 1:02 AMschrockn
02/07/2021, 1:04 AMschrockn
02/07/2021, 1:04 AMdanil
02/07/2021, 1:04 AMdownload_local(local_path)
would not execute since it will be ignored because the input to it is emptyschrockn
02/07/2021, 1:04 AMschrockn
02/07/2021, 1:05 AMdanil
02/07/2021, 1:05 AMdanil
02/07/2021, 1:05 AMA Python-based domain specific language (DSL)
Haha yeah it reminds me of Scala stuff nowschrockn
02/07/2021, 1:08 AMschrockn
02/07/2021, 1:09 AMdanil
02/07/2021, 1:11 AMdanil
02/07/2021, 4:00 AM#dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid get_data returned problematic value of type <class 'generator'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
if using Optional Outputs or getting # dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'get_data' has unmapped output 'result'. Remove it or return a value from the appropriate solid invocation.
If I strictly follow your code block. Attached is a nice gist:
https://gist.github.com/dankolesnikov/861cd7f752ff8d10cbf7d7ecea4b3795
It works one way (local or prod) if I return
1 of the dataframes from the composite solid - thats an indication that its working overall - just need to figure out how to structure the DSL syntax inside that composite solid.daniel
02/07/2021, 6:57 PMreturn {
"df_from_local": df_local,
"df_from_s3: df_s3,
}
This was very confusing to me too when I was first learning the system - we have a project for our next release that should make @composite_solid less confusing by consolidating it with @pipeline into a single concept.danil
02/07/2021, 8:54 PMdf_local
or df_s3
will have data in it, that is determined by the get_data
composite solid and the preset that the pipeline uses.
def pipeline():
df_input = get_data()
df_processed = logic.transform_data(df_input)
io_solids.save_df(df_processed)
Here df_input
will be a tuple of (df_local, df_s3) one of which would umm None? But transform_data
should accept only 1 df.
What are your recommendations for the remedy? Is it possible to check whether one of the entries in the df_input
tuple null and give the other df to transform_data
accordingly?danil
02/07/2021, 8:59 PMget_from_local
can be reused later in the DAG or by other teams all together (like a library).
The way I am approaching this with the design / architecture above - is that the Dagster way? Is there a more simple way of organizing this using some advanced Dagster tricks?alex
02/08/2021, 4:39 PMIOManagers
: controls how data is passed solid to solid, would give you a place control how dataframes are saved
RootInputManagers
: controls how inputs are loaded from config allowing you to load dataframe from s3 or local path
https://docs.dagster.io/overview/io-managers/io-managersalex
02/08/2021, 4:41 PMalex
02/08/2021, 4:42 PMSelector
config type would be good to use for the s3 vs local loading pattern https://docs.dagster.io/_apidocs/config#dagster.Selector