Hey all, I recall in a previous post that I believ...
# announcements
e
Hey all, I recall in a previous post that I believe @alex (apologies alex if it wasn't you) mentioned Elementl has some internal tools to generate all of the parameters for the aws and other functions used in the dagster-aws and other libraries. Is that auto signature generation something that could be shared ? I'm essentially trying to duplicate the functionality of the
solid_from_func
decorator in this pull request https://github.com/dagster-io/dagster/pull/2049 and couple it with an automatic signature generation in hopes to use 3rd part library functions out of the box. Our dev team here is very small and having to duplicate the signatures of all the functions from all the libraries we plan to use has become a serious blocker.
Here is an attempt thus far largely leveraging the pull request above:
Copy code
import makefun
import dagster
import inspect

def solid_from_func(func):
    # capture the signature of func
    sig = inspect.signature(func)
    params = list(sig.parameters.values())
    params.insert(0, inspect.Parameter('context', kind=inspect.Parameter.POSITIONAL_OR_KEYWORD))
    new_sig = sig.replace(parameters=params)

    # modify the signature to add 'context'
    params = list(sig.parameters.values())

    type_map = { 
        bool: dagster.Bool,
        float: dagster.Float,
        int: <http://dagster.Int|dagster.Int>,
        str: dagster.String,
        list: dagster.Array,
        dict: dagster.Dict
    }

    config_from_params = {}

    for k, v in sig.parameters.items():
        try:
            if v.default is not inspect.Parameter.empty:
                config_from_params.update({k: dagster.Field(type_map[type(v.default)], default_value=v.default, is_optional=True)})
            else:
                config_from_params.update({k: dagster.Field(type_map[type(v.default)], is_optional=False)})
        except KeyError:
            config_from_params.update({k: dagster.Field(dagster.Any, is_optional=False)})
    
    # create our solid
    @dagster.solid(config=config_from_params)
    @makefun.wraps(func, new_sig=new_sig)
    def new_solid(context, *args, **kwargs):
        # call the original function
        return func(context, *args, **kwargs)

    return new_solid
which is receiving an error:
Copy code
decorator_name = '@solid', fn_name = 'read_excel', compute_fn = <function read_excel at 0x000001BED642FD38>
input_defs = [<dagster.core.definitions.input.InputDefinition object at 0x000001BED643DA08>, <dagster.core.definitions.input.InputD...tion object at 0x000001BED643DAC8>, <dagster.core.definitions.input.InputDefinition object at 0x000001BED644C848>, ...]
expected_positionals = ['context'], exclude_nothing = True

    def validate_solid_fn(
        decorator_name, fn_name, compute_fn, input_defs, expected_positionals=None, exclude_nothing=True
    ):
        check.str_param(decorator_name, 'decorator_name')
        check.str_param(fn_name, 'fn_name')
        check.callable_param(compute_fn, 'compute_fn')
        check.list_param(input_defs, 'input_defs', of_type=InputDefinition)
        expected_positionals = check.opt_list_param(
            expected_positionals, 'expected_positionals', of_type=str
        )
        if exclude_nothing:
            names = set(inp.name for inp in input_defs if not inp.runtime_type.is_nothing)
            nothing_names = set(inp.name for inp in input_defs if inp.runtime_type.is_nothing)
        else:
            names = set(inp.name for inp in input_defs)
            nothing_names = set()

        # Currently being super strict about naming. Might be a good idea to relax. Starting strict.
        fn_positionals, input_args = split_function_parameters(compute_fn, expected_positionals)

        # Validate Positional Parameters
        missing_positional = validate_decorated_fn_positionals(fn_positionals, expected_positionals)
        if missing_positional:
            raise DagsterInvalidDefinitionError(
                "{decorator_name} '{solid_name}' decorated function does not have required positional "
                "parameter '{missing_param}'. Solid functions should only have keyword arguments "
                "that match input names and a first positional parameter named 'context'.".format(
>                   decorator_name=decorator_name, solid_name=fn_name, missing_param=missing_positional
                )
            )
E           dagster.core.errors.DagsterInvalidDefinitionError: @solid 'read_excel' decorated function does not have required positional parameter 'context'. Solid functions should only have keyword arguments that match input names and a first positional parameter named 'context'.

venv\lib\site-packages\dagster\core\definitions\decorators.py:494: DagsterInvalidDefinitionError
m
hmm
e
I'm confused as to why I'm getting the error since I think that's what the
solid_from_func
is doing.
a
is
params.insert
mutative or do you need to reassign to
params
e
it is mutative
a
my next guess is the
@makefun.wraps
invocation / decorator resolution order
have you stepped through in a debugger yet?
e
not yet but
makefun
was my next step. I'm not familiar with it and is added here since it was part of the pull request.
a
I think you’ll be better off switching to
@lambda_solid
as well and not dealing with mutating the function signature
is it your goal to provide all the arguments to these auto-wrapped functions via config?
As opposed to inputs/outputs
e
yes exactly, without having to define them all manually
a
ah interesting - so you need
@solid
to pull the config values out of
context.solid_config
ya since your solid will have no inputs or outputs you should be able to drop the
makefun.wraps
stuff
Copy code
# create our solid
    @dagster.solid(
        name=func.__name__,
        config=config_from_params,
    )
    def new_solid(context):
        # call the original function with the config values
        return func(**context.solid_config)
e
yes ! exactly this ^ . there's one little caveat with strictly positional arguments but I'll mess with it more and should be able to get it from here. thanks Alex !
a
you can also add a
Nothing
type
InputDefinition
if you want to be able to sequence these solids without data flowing between them
e
I'm hoping this could wrap a lot of the common pandas functions such as
read_excel
and
to_sql
etc. Those would still require outputs being passed to one another. That should still work with the
new_solid
example you posted above, yes?
a
ya - outputs should work fine ( though they will be typed
Any
). The
Nothing
input is for having one of those solids run after something else
but if they are always safe to run from the get go you should be fine
e
Why does the
Nothing
need to be defined if it were to run after other solids ?
a
well the only way to ensure one solid runs after another is to add a dependency edge to the dag - and
Nothing
is the way to have a data-free dependency
e
I see. and if there was a need for a data dependency it could be used "as is" but will have a return type of Any ?
I can see why this hasn't been implemented yet. Many of the compromising solutions kind of (if I understand correctly) go against the whole methodology of Dagster when it comes to test-ability and data assurance 😛
😉 1
a
uhh im not sure i follow that statement exactly - but I think you are on the right track
e
one last question, I have a test for the
solid_from_func
wrapper that looks like
Copy code
def test_solid_from_func_wrapper():
    excel_solid = solid_from_func(read_excel)
    res = execute_solid(
        excel_solid,
        input_values={
            "io": "cereals.xlsx",
            "header": True,
            "skiprows": 1
        })
    assert res.success
I'm getting an error saying
Invalid dependencies: solid "read_excel" does not have input "io"
which is true there is no input defined but I should still be able to specify config ?
feel like I'm just putting the config in the wrong spot - aka shouldn't be going in the
input_values
. Should I be using the
environment_dict
instead ?
m
yep exactly
👍 1
e
just for posterity here is the documentation for what the
environment_dict
structure looks like. https://dagster.readthedocs.io/en/stable/sections/api/apidocs/execution.html#pipeline-configuration Amazing what you can glean when you rtfm 😛
m
it may be too buried in there 😞
e
I have a working test for all of the above conversation (even though, at this point I wouldn't recommend going down this road) but I'll leave it here for educational purposes: The wrapper tweaked from the pull request above that automatically generates the config for the underlying function. The config generated is a "best guess" at each parameters data type while the rest is shamefully lumped into the
Any
type. There are also some other subtleties here like this line the potentially makes required parameters of the underlying function not required 🤐
config_from_params.update({k: dagster.Field(dagster.Any, is_required=False)})
Copy code
import dagster
import inspect

def solid_from_func(func, alias=None):
    # capture the signature of func
    sig = inspect.signature(func)
    params = list(sig.parameters.values())
    params.insert(0, inspect.Parameter('context', kind=inspect.Parameter.POSITIONAL_OR_KEYWORD))
    new_sig = sig.replace(parameters=params)

    type_map = { 
        bool: dagster.Bool,
        float: dagster.Float,
        int: <http://dagster.Int|dagster.Int>,
        str: dagster.String,
        list: dagster.Array,
        dict: dagster.Dict
    }

    config_from_params = {}

    for k, v in sig.parameters.items():
        try:
            if v.default is not inspect.Parameter.empty:
                config_from_params.update({k: dagster.Field(type_map[type(v.default)], default_value=v.default, is_required=False)})
            else:
                config_from_params.update({k: dagster.Field(type_map[type(v.default)], is_required=True)})
        except KeyError:
            config_from_params.update({k: dagster.Field(dagster.Any, is_required=False)})
    
    # create our solid
    @dagster.solid(
        name=alias or func.__name__,
        config=config_from_params
    )
    def new_solid(context):
        # call the original function with the config values
        return func(**context.solid_config)

    return new_solid
and the test given a proper path to the excel file. Not the config defined in the
environment_dict
not in the
input_values
!
Copy code
from pandas import read_excel
from os import getcwd
from os.path import join, dirname, abspath

def test_solid_from_func_wrapper():
    excel_solid = solid_from_func(read_excel, "excel_solid")
    res = execute_solid(
        excel_solid,
        environment_dict={
            "solids": {
                "excel_solid": {
                    "config": {
                        "io": join(dirname(abspath(__file__)), "cereals.xlsx"),
                        "header": 1,
                        "skiprows": 1
                    }
                }
            }
        }
    )
    assert res.success
👍 1
m
@Eric do you mind if i throw this example code on https://github.com/dagster-io/dagster/pull/2049 for posterity?
e
is it intended behavior that I am able to do this because it returns a
SolidDefinition
having to pass the alias to the wrapper so I can reuse the solid returned:
Copy code
excel_solid = solid_from_func(read_excel, "excel_solid")
    res = execute_solid(
        excel_solid,
        ...)
But I'm not able to do this because it's a
CallableSolidNode
and not strictly a
SolidDefinition
after giving it an alias like this:
Copy code
excel_solid = solid_from_func(read_excel).alias( "excel_solid")
    res = execute_solid(
        excel_solid,
        ...)
@max absolutely ! Perhaps there should be some notes at the top with a warning (if you agree and see fit) that even though that works, there are some caveats and probably isn't aligned with the methodology of Dagster in the first place. But hopefully it'll help others to come up with better iterations for a pull request later 😜
👍 1
a
ya its a bit unfortunate but the
.alias()
stuff is only designed to work in the context of
composition functions
like
@pipeline
and
@composite_solid
e
good to know 👍 thanks alex
a
you can do
Copy code
@pipeline
def test():
  excel_solid.alias('cool_name')()

result = execute_pipeline(test)
if you want to test the aliasing stuff (this is pretty much all the
execute_solid
helper does)
e
oo spicy. hadn't considered that.