Eric
02/25/2020, 6:42 PMsolid_from_func
decorator in this pull request https://github.com/dagster-io/dagster/pull/2049 and couple it with an automatic signature generation in hopes to use 3rd part library functions out of the box.
Our dev team here is very small and having to duplicate the signatures of all the functions from all the libraries we plan to use has become a serious blocker.import makefun
import dagster
import inspect
def solid_from_func(func):
# capture the signature of func
sig = inspect.signature(func)
params = list(sig.parameters.values())
params.insert(0, inspect.Parameter('context', kind=inspect.Parameter.POSITIONAL_OR_KEYWORD))
new_sig = sig.replace(parameters=params)
# modify the signature to add 'context'
params = list(sig.parameters.values())
type_map = {
bool: dagster.Bool,
float: dagster.Float,
int: <http://dagster.Int|dagster.Int>,
str: dagster.String,
list: dagster.Array,
dict: dagster.Dict
}
config_from_params = {}
for k, v in sig.parameters.items():
try:
if v.default is not inspect.Parameter.empty:
config_from_params.update({k: dagster.Field(type_map[type(v.default)], default_value=v.default, is_optional=True)})
else:
config_from_params.update({k: dagster.Field(type_map[type(v.default)], is_optional=False)})
except KeyError:
config_from_params.update({k: dagster.Field(dagster.Any, is_optional=False)})
# create our solid
@dagster.solid(config=config_from_params)
@makefun.wraps(func, new_sig=new_sig)
def new_solid(context, *args, **kwargs):
# call the original function
return func(context, *args, **kwargs)
return new_solid
which is receiving an error:
decorator_name = '@solid', fn_name = 'read_excel', compute_fn = <function read_excel at 0x000001BED642FD38>
input_defs = [<dagster.core.definitions.input.InputDefinition object at 0x000001BED643DA08>, <dagster.core.definitions.input.InputD...tion object at 0x000001BED643DAC8>, <dagster.core.definitions.input.InputDefinition object at 0x000001BED644C848>, ...]
expected_positionals = ['context'], exclude_nothing = True
def validate_solid_fn(
decorator_name, fn_name, compute_fn, input_defs, expected_positionals=None, exclude_nothing=True
):
check.str_param(decorator_name, 'decorator_name')
check.str_param(fn_name, 'fn_name')
check.callable_param(compute_fn, 'compute_fn')
check.list_param(input_defs, 'input_defs', of_type=InputDefinition)
expected_positionals = check.opt_list_param(
expected_positionals, 'expected_positionals', of_type=str
)
if exclude_nothing:
names = set(inp.name for inp in input_defs if not inp.runtime_type.is_nothing)
nothing_names = set(inp.name for inp in input_defs if inp.runtime_type.is_nothing)
else:
names = set(inp.name for inp in input_defs)
nothing_names = set()
# Currently being super strict about naming. Might be a good idea to relax. Starting strict.
fn_positionals, input_args = split_function_parameters(compute_fn, expected_positionals)
# Validate Positional Parameters
missing_positional = validate_decorated_fn_positionals(fn_positionals, expected_positionals)
if missing_positional:
raise DagsterInvalidDefinitionError(
"{decorator_name} '{solid_name}' decorated function does not have required positional "
"parameter '{missing_param}'. Solid functions should only have keyword arguments "
"that match input names and a first positional parameter named 'context'.".format(
> decorator_name=decorator_name, solid_name=fn_name, missing_param=missing_positional
)
)
E dagster.core.errors.DagsterInvalidDefinitionError: @solid 'read_excel' decorated function does not have required positional parameter 'context'. Solid functions should only have keyword arguments that match input names and a first positional parameter named 'context'.
venv\lib\site-packages\dagster\core\definitions\decorators.py:494: DagsterInvalidDefinitionError
max
02/25/2020, 7:04 PMEric
02/25/2020, 7:05 PMsolid_from_func
is doing.alex
02/25/2020, 7:06 PMparams.insert
mutative or do you need to reassign to params
Eric
02/25/2020, 7:08 PMalex
02/25/2020, 7:09 PM@makefun.wraps
invocation / decorator resolution orderEric
02/25/2020, 7:11 PMmakefun
was my next step. I'm not familiar with it and is added here since it was part of the pull request.alex
02/25/2020, 7:13 PM@lambda_solid
as well and not dealing with mutating the function signatureEric
02/25/2020, 7:16 PMalex
02/25/2020, 7:18 PM@solid
to pull the config values out of context.solid_config
makefun.wraps
stuff# create our solid
@dagster.solid(
name=func.__name__,
config=config_from_params,
)
def new_solid(context):
# call the original function with the config values
return func(**context.solid_config)
Eric
02/25/2020, 8:23 PMalex
02/25/2020, 8:34 PMNothing
type InputDefinition
if you want to be able to sequence these solids without data flowing between themEric
02/25/2020, 8:36 PMread_excel
and to_sql
etc. Those would still require outputs being passed to one another. That should still work with the new_solid
example you posted above, yes?alex
02/25/2020, 8:39 PMAny
). The Nothing
input is for having one of those solids run after something elseEric
02/25/2020, 8:42 PMNothing
need to be defined if it were to run after other solids ?alex
02/25/2020, 8:45 PMNothing
is the way to have a data-free dependencyEric
02/25/2020, 8:46 PMalex
02/25/2020, 8:50 PMEric
02/25/2020, 9:46 PMsolid_from_func
wrapper that looks like
def test_solid_from_func_wrapper():
excel_solid = solid_from_func(read_excel)
res = execute_solid(
excel_solid,
input_values={
"io": "cereals.xlsx",
"header": True,
"skiprows": 1
})
assert res.success
I'm getting an error saying Invalid dependencies: solid "read_excel" does not have input "io"
which is true there is no input defined but I should still be able to specify config ?input_values
. Should I be using the environment_dict
instead ?max
02/25/2020, 9:48 PMEric
02/25/2020, 9:56 PMenvironment_dict
structure looks like. https://dagster.readthedocs.io/en/stable/sections/api/apidocs/execution.html#pipeline-configuration
Amazing what you can glean when you rtfm 😛max
02/25/2020, 9:59 PMEric
02/25/2020, 10:46 PMAny
type. There are also some other subtleties here like this line the potentially makes required parameters of the underlying function not required 🤐 config_from_params.update({k: dagster.Field(dagster.Any, is_required=False)})
import dagster
import inspect
def solid_from_func(func, alias=None):
# capture the signature of func
sig = inspect.signature(func)
params = list(sig.parameters.values())
params.insert(0, inspect.Parameter('context', kind=inspect.Parameter.POSITIONAL_OR_KEYWORD))
new_sig = sig.replace(parameters=params)
type_map = {
bool: dagster.Bool,
float: dagster.Float,
int: <http://dagster.Int|dagster.Int>,
str: dagster.String,
list: dagster.Array,
dict: dagster.Dict
}
config_from_params = {}
for k, v in sig.parameters.items():
try:
if v.default is not inspect.Parameter.empty:
config_from_params.update({k: dagster.Field(type_map[type(v.default)], default_value=v.default, is_required=False)})
else:
config_from_params.update({k: dagster.Field(type_map[type(v.default)], is_required=True)})
except KeyError:
config_from_params.update({k: dagster.Field(dagster.Any, is_required=False)})
# create our solid
@dagster.solid(
name=alias or func.__name__,
config=config_from_params
)
def new_solid(context):
# call the original function with the config values
return func(**context.solid_config)
return new_solid
and the test given a proper path to the excel file. Not the config defined in the environment_dict
not in the input_values
!
from pandas import read_excel
from os import getcwd
from os.path import join, dirname, abspath
def test_solid_from_func_wrapper():
excel_solid = solid_from_func(read_excel, "excel_solid")
res = execute_solid(
excel_solid,
environment_dict={
"solids": {
"excel_solid": {
"config": {
"io": join(dirname(abspath(__file__)), "cereals.xlsx"),
"header": 1,
"skiprows": 1
}
}
}
}
)
assert res.success
max
02/25/2020, 10:49 PMEric
02/25/2020, 10:53 PMSolidDefinition
having to pass the alias to the wrapper so I can reuse the solid returned:
excel_solid = solid_from_func(read_excel, "excel_solid")
res = execute_solid(
excel_solid,
...)
But I'm not able to do this because it's a CallableSolidNode
and not strictly a SolidDefinition
after giving it an alias like this:
excel_solid = solid_from_func(read_excel).alias( "excel_solid")
res = execute_solid(
excel_solid,
...)
alex
02/25/2020, 10:56 PM.alias()
stuff is only designed to work in the context of composition functions
like @pipeline
and @composite_solid
Eric
02/25/2020, 10:57 PMalex
02/25/2020, 10:58 PM@pipeline
def test():
excel_solid.alias('cool_name')()
result = execute_pipeline(test)
execute_solid
helper does)Eric
02/25/2020, 10:59 PM