Can someone please help in understanding `composit...
# announcements
d
Can someone please help in understanding
composite_solid
abstraction? I am getting
UserWarning: Error loading repository location local_repo.py:dagster.core.errors.DagsterUserCodeProcessError: AttributeError: 'InputMappingNode' object has no attribute 'startswith'
when passing a value to
composite_solid
that will then based on that value will determine which
solid
to execute. Code is attached in the thread to spare the channel.
Copy code
@composite_solid
def read_csv_as_df(path: str) -> constants.PandasDataFrame:
    if(path.startswith("s3://")):
        return read_csv_as_df_s3(s3_path = path)
    else:
        return read_csv_as_df_locally(local_path = path)

@solid
def read_csv_as_df_locally(context, local_path: str) -> constants.PandasDataFrame:
    <http://context.log.info|context.log.info>(f"Reading csv data locally from {local_path}")
    return pd.read_csv(local_path)

@composite_solid
def read_csv_as_df_s3(s3_path: str) -> constants.PandasDataFrame:
    bucket, key = utils.parse_s3_path(s3_path)
    csv_file = get_from_s3(bucket=bucket, key=key)
    return pd.read_csv(csv_file)

"""
Get any file from S3 given proper bucket, key
"""
@solid(required_resource_keys={'s3'})
def get_from_s3(context, bucket: str, key: str):
    <http://context.log.info|context.log.info>(f"Attempting to get file from S3 from {bucket}/{key}")
    return context.resource.s3.get_object(Bucket=bucket, Key=key)
Copy code
@pipeline(
    name="BabyPipeline",
    preset_defs=[
        PresetDefinition(
            name="local",
            run_config=configs.local_config_raw,
            tags= {"dataset": "/Users/danil/Desktop/dagster-pipelines/dagster_baby_pipeline/data/forestfires.csv"}
        ),
        PresetDefinition(
            name="prod",
            run_config=configs.prod_config_raw,
            tags= {"dataset": "<s3://fable.data/baby_pipeline/forestfires.csv>"}
        )
    ]
)
def pipeline():
    df_input = io_solids.read_csv_as_df()
    df_processed = logic.transform_data(df_input)
    io_solids.save_df(df_processed)

###
local_config_raw = {
        "solids": {
            "read_csv_as_df": {
                "inputs": {"path": {"value": "/Users/danil/Desktop/dagster-pipelines/dagster_baby_pipeline/data/forestfires.csv"}}
            },
            "save_df": {
                "inputs": {"path": {"value": "/Users/danil/Desktop/dagster-pipelines/dagster_baby_pipeline/data/processsed_df.csv"}}
            }
        }
    }

prod_config_raw = {
        "solids": {
            "read_csv_as_df": {
                "inputs": {"path": {"value": "<s3://fable.data/baby_pipeline/forestfires.csv>"}}
            },
            "save_df": {
                "inputs": {"path": {"value": "<s3://fable.data/baby_pipeline/processed_df.csv>"}}
            }
        }
    }
Pretty much based on the preset (
local, prod
) the dataset path is different, if the preset is local then the path is local, if preset is prod then the path leads to S3. Based on this input the
read_csv_as_df
Composite Solid will decide which solid to use either
read_csv_as_df_locally
or
read_csv_as_df_s3
, when trying to load this via
Dagit
I am getting the following error:
Copy code
dagster/cli/workspace/workspace.py:50: UserWarning: Error loading repository location local_repo.py:dagster.core.errors.DagsterUserCodeProcessError: AttributeError: 'InputMappingNode' object has no attribute 'startswith'
At
_if_(path.startswith("s3://")):
over here:
Copy code
@composite_solid
def read_csv_as_df(path: str) -> constants.PandasDataFrame:
    if(path.startswith("s3://")):
It looks like the value defined in the config is not being passed into the composite solid. Am I using this abstraction incorrectly? I’ve considered tags to drive this logic but after reading the docs I see that it is not the recommended way (makes sense). Thank you for the future help !
n
@danil Could you post your code in a Snippet or Gist or something else?
It's very hard to read when you use the simple backticks blocks
s
The composite_solid is more like a pipeline. The type annotation is what is confusing. It is not actually a string. It is just a shorthand for
input_defs=[InputDefinition("path", str]
(We are likely going to discourage the practice of using type annotations that violate mypy typing fwiw)
d
@Noah K Will do!
s
So you can’t actually operate on the data within a composite
👀 1
only construct an abstract dag like you do in
pipeline
1
n
Ahh yeah, that would do it
Also unrelated but there are no outer () on a Python
if
🙂
🙌 1
You can simulate the conditional using a solid with two optional outputs
Only the part of the DAG corresponding to the output that gets returned will happen
👍 1
So it looks more like this:
d
@schrockn I see, that was one of my hunches. What is the recommended practice in Dagster to achieve what I am trying to do? Having 2+ presets with its own dataset path that would drive the logic downstream in solids/composite_solids
n
Copy code
s3_url, local_path = is_it_s3_or_local(path)
download_s3(s3_url)
download_local(local_path)
s
^--- what @Noah K said
there is also the resource system which is often use to abstract away s3 vs local development
n
Yahr, if that's not something dynamic from a prior solid, no need to make the DAG itself dynamic to match 🙂
d
@Noah K Can you please elaborate more on your construct? For example In
local
preset,
is_it_s3_or_local()
would return
None
for
s3_url
and
download_s3(s3_url)
would check for
None
and also return
None
. Am I reading your intention correctly? So
download_s3()
would execute for no reason technically.
n
So outputs in Dagster don't quite work the same way as Python function returns
👀 1
return foo
at the end of a solid is just a shortcut for
yield Output(name="foo", value=foo)
more or less
✔️ 2
And None is a totally legit return value, works as expected
1
But if you don't yield one of the outputs at all
(and it's marked as optional, since failing to yield a required output would be a fatal error)
Then it just ignores all downstream solids that would have used that value
So here you would have two named outputs and yield only one of them
👍 2
(since inside the Solid code you can use the actual value)
s
pipeline
and
composite_solid
are not invoking normal python functions, they are constructing a graph.
1
n
A Python-based domain specific language (DSL)
™️ 1
s
Noah’s code constructs a graph like this
1
and only one of the output “fires”
d
@Noah K Ahh, I see. Interesting.. So technically,
download_local(local_path)
would not execute since it will be ignored because the input to it is empty
s
the unfortunate thing about that structure is the that launching entire process/pod/whatever is a lot of overhead to a conditional
but it will work
d
Alright, @schrockn , @Noah K I am totally digging this now.
A Python-based domain specific language (DSL)
Haha yeah it reminds me of Scala stuff now
blob thinking 1
s
thanks for bearing with this! you managed to stumble two of the more aggravating parts of the system simultaneously
😁 1
namely that composite_solid is a separate abstraction and we commingle the dagster and python type systems a little too liberally
d
Thats super cool tho, I am excited to dig into dagster codebase later to see how it works under the hood. Kudos for making Dagster!!
🙏 1
thankyou 1
@Noah K I am almost there with getting it working, the only question remaining is how to structure the inside of the composite solid - I am getting
#dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid get_data returned problematic value of type <class 'generator'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
if using Optional Outputs or getting
# dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'get_data' has unmapped output 'result'. Remove it or return a value from the appropriate solid invocation.
If I strictly follow your code block. Attached is a nice gist: https://gist.github.com/dankolesnikov/861cd7f752ff8d10cbf7d7ecea4b3795 It works one way (local or prod) if I
return
1 of the dataframes from the composite solid - thats an indication that its working overall - just need to figure out how to structure the DSL syntax inside that composite solid.
d
Hi danil - for your second example: unlike solids, composite solids can't be generators, since they're defining a DAG (like @pipeline) rather than wrapping an execution function (like @solid). What you want instead is
Copy code
return {
   "df_from_local": df_local,
   "df_from_s3: df_s3,
}
This was very confusing to me too when I was first learning the system - we have a project for our next release that should make @composite_solid less confusing by consolidating it with @pipeline into a single concept.
d
Hey @daniel, yeah I think from the composite_*solid* name I thought it has more to do with solids rather than pipeline but now it totally makes sense since it constructs the DAG only. The syntax you mentioned worked great but now there is a bigger problem I think. At runtime, we don’t know whether
df_local
or
df_s3
will have data in it, that is determined by the
get_data
composite solid and the preset that the pipeline uses.
Copy code
def pipeline():
    df_input = get_data()
    df_processed = logic.transform_data(df_input)
    io_solids.save_df(df_processed)
Here
df_input
will be a tuple of (df_local, df_s3) one of which would umm None? But
transform_data
should accept only 1 df. What are your recommendations for the remedy? Is it possible to check whether one of the entries in the
df_input
tuple null and give the other df to
transform_data
accordingly?
And really on a high level if we take a step back, I am trying to get data from either local / S3 environment depending on the preset in a way that would make solids reusable by lets say other teams? The reason I am breaking down all of this logic into separate solids is with the intention that those solids like
get_from_local
can be reused later in the DAG or by other teams all together (like a library). The way I am approaching this with the design / architecture above - is that the Dagster way? Is there a more simple way of organizing this using some advanced Dagster tricks?
a
given the examples above, I would recommend looking in to
IOManagers
: controls how data is passed solid to solid, would give you a place control how dataframes are saved
RootInputManagers
: controls how inputs are loaded from config allowing you to load dataframe from s3 or local path https://docs.dagster.io/overview/io-managers/io-managers
This would allow you to move the save/load logic for dataframes in to a components that can be attached to inputs/outputs of any solid. Then your pipeline just consists of the solids with meaningful transforms / business logic.
The
Selector
config type would be good to use for the s3 vs local loading pattern https://docs.dagster.io/_apidocs/config#dagster.Selector