curious if anyone knows how to get around this: ``...
# ask-community
j
curious if anyone knows how to get around this:
Copy code
UserWarning: Error loading repository location proj_test:dagster._core.errors.DagsterInvalidDefinitionError: Attempted to pass typing.Callable to a Field that expects a valid dagster type usable in config (e.g. Dict, Int, String et al).
i'm trying to pass a function into an op as part of a configuration:
Copy code
@op(required_resource_keys={"bgq"}, config_schema={'qry_str': str, 'filter_func': Field(Callable, default_value=None, is_required=False)})
dagster bot responded by community 1
z
I dont think passing functions are supported like this (Someone else may correct my if I’m wrong, but I think this is due to how these are serialized?), there’s some type checks that get ran that raise this error. A workaround here may be to abstract out the need to pass a function. EG: A dictionary that represent the parameters of a filter function, a string that represents it, etc. You also may be able to get a bit fancy and use some dagster enum stuff here. If you look at the docs for config schema it will mention what the possible types are. Seems like Dagster’s Enum supports a config value (string), as well as a “python value” that could be any python object, aka, function.
🙏 1
j
yeah i was trying something like this:
Copy code
config_schema={'qry_str': str,
                   'filter_func': Field(DagsterType(
	                    type_check_fn=lambda x: type(x) == Callable,
                        name="filter_func"),
                        default_value=None,
                        is_required=False)})
dagstertype but it's over my head...
z
AFAIK these are mainly used for input/output of assets/ops, not as a type for config
D 1
j
ahh ok thank you
z
See here for the possible types
j
so yeah, maybe i could just pass the type as an Enum that's a good idea thank you
z
It's possible, I'm not sure. Could be helpful in creating a robust solution
What sort of function are you passing? I can see the desire for more functional programming here but in most cases passing essentially a dict of parameters for a function has been sufficient for me.
j
im trying to learn how to structure projects - so in this case i have an op that pulls from bigquery and turns it into a dataframe.. what i want to do is have that op is a reusable, which i can run
configured(
on and pass through a function that filters/does whatever to the dataframe
i dont want to have to create another op for that
z
I’d heavily suggest you check out the software defined assets & io_managers in that case!
j
i will check out the io_managers for sure - i didnt want to use this one as an asset because its an op meant for one-time big batch jobs that i want deleted after
meaning after the first big batch job, then im gonna make the actual final table an asset because it takes incremental updates
z
I see
I also checked, you can embed functions into enums, but it’s essentially the same as using a string config -> a dictionary, perhaps a bit more reusable. I’d still heavily suggest looking into the io_managers for your use case here. They work quite well with ops from my experience.
j
ahh any chance you could show me an example using enum?? i tried doing that, ended up using this (which works but seems meh):
Copy code
@op(required_resource_keys={"rsrc_bigquery_runner"},
    config_schema={'qry_str': str,
                   'filter_func': Field(config=str, is_required=False)
                   })
def bigq_adjust_df_results(context) -> pl.DataFrame:
	# qry_str = context.op_config['qry_str']
	qry_str = QRY_STR
	filter_func = context.op_config['filter_func']
	df: pl.DataFrame = context.resources.rsrc_bigquery_runner(qry_str).bigquery_to_df()
	if filter_func is None:
		return df
	else:
		filtered_df = globals()[filter_func](df)
		get_dagster_logger().info(
			f'ADJUSTED DATAFRAME SHAPE: {str(filtered_df.shape)} columns: {str(filtered_df.shape[1])}, rows: {str(filtered_df.shape[0])}')
		return filtered_df
was also trying to figure out something like
Field(config=str|None
but i see you need to use
Noneable
z
I didnt do a full example, just ran it locally to see if passing in a function as the python_object to EnumValue would work or not.
j
ahh cool cool well thank you!
z
Field also has a “required” param that can be set to false, or define a default value.
j
not sure if
globals()
is really best practices u know
z
Ahhh okay seems like the issue is that there isn’t a big query io manager by default yet 😞
j
yeah i just made a resource that has it query and then one of the methods makes it a dataframe
z
You may want to write a lightweight one, that would probably make it so that your op can be “business logic only”.
I see your’e using “pl.dataframe”, is this polars?
j
yea
so ur saying the io manager would lead to not having to call that resource?
z
It would make it a little bit more “clean”, so for example 90% of our ops and assets get passed in a dataframe and return another dataframe.
Makes them quite easy to unit test 😉
j
right right im hoping to implement something like that
z
If you guys are a big-query shop I think it’d be worth writing one. But as far as your orginal question goes, I think a way to pass the parameters to pl.DataFrame.filter would work well.
j
meaning if i have something like
bigquery resource
-->
op that adjusts dataframe if needed
-->
op to do other stuff
im trying to figure if this is an ok structure
we are a....talend shop 😭 and im trying to make some pipelines that will convince everyone to switch to dagster
but actually this is going from bigq to redshift
yea the function that gets passed into that op actually has a
pl.DataFrame.filter
line, but i just figured id split it up and have it in an outside function instead of another op
z
Ahh I see, so if you do something like accept a dictionary of values you can pass this to pl.DataFrame.filter
j
the thing is id like that op to sit in some kind of
common
folder, and then any job that uses it can have its own function sitting in the file if theres a need
z
eg: filter_opts:Field(Permissive(), desc=“options to pass to plrs.filter”) pl.Dataframe.filter(df, **context.op_config.filter_opts)
j
ahh i see what ur saying just boil it down to one function
i was thinking like this '
Copy code
def filter_func(df):
   filter_df = df.filter(pl.col("client_bill_amount") > 300)
   #other stuff idk what
   return filter_df
z
You can then later improve this by changing the Filer(Permissive()) to be more strongly typed. That way if you pass a kwarg that filter doesnt accept it will raise an understandable error
Stuff like that can also work. It really depends on your goal. If you have a function that always runs the same thing: eg business logic that works
We have several spark functions for example that are like that
j
right so the op would be commonly used but that function would be different in each file if there was a need for the function
the issue is it becomes a headache to make another op
z
So think about it this way: Ops and Assets are things that need to be orchestrated in some way.
A filter operation doesn’t need to be orchestrated in mot cases, so you can just use them as a python library.
🚀 1
j
ive straight up copied an entire folder to just create a test folder, and started getting errors about ops being named the same
z
for example we have stuff like this:
Copy code
def my_business_logic_1(df)->df:
    return df.filter(...).other_transformations()

@asset
def my_dataset(input_data):
    my_business_logic_1(input_data).group_by("something that is given to polars")
This way we can write plane unit tests for my_business_logic_1, and don’t need to worry about if redshift/gcp/spark/whatever exists or not.
j
ok cool cool im glad thats ok to do
yeah thats exactly what im hoping to setup
thank you for this
z
(It also may be worth checking out airbyte, if your company has lots of simple etl just for importing/exporting from a few different systems. it has great integration with dagster, and is coming up in a community call soon 🙂 )
j
too many tools 😭
another one to look into lol
z
😆 Just worth knowing about, IMO orchestrator first.
j
🙏 yes i need to get out of the windows talend server and then idc. but actually all i heard about was airflow, and just saw a comment about dagster which led me here. so im glad to hear about airbyte now also
👍 1