Jake Kagan
01/17/2023, 6:12 PMUserWarning: Error loading repository location proj_test:dagster._core.errors.DagsterInvalidDefinitionError: Attempted to pass typing.Callable to a Field that expects a valid dagster type usable in config (e.g. Dict, Int, String et al).
i'm trying to pass a function into an op as part of a configuration:
@op(required_resource_keys={"bgq"}, config_schema={'qry_str': str, 'filter_func': Field(Callable, default_value=None, is_required=False)})
Zach P
01/17/2023, 6:30 PMJake Kagan
01/17/2023, 7:31 PMconfig_schema={'qry_str': str,
'filter_func': Field(DagsterType(
type_check_fn=lambda x: type(x) == Callable,
name="filter_func"),
default_value=None,
is_required=False)})
dagstertype but it's over my head...Zach P
01/17/2023, 7:33 PMJake Kagan
01/17/2023, 7:34 PMZach P
01/17/2023, 7:34 PMJake Kagan
01/17/2023, 7:35 PMZach P
01/17/2023, 7:36 PMJake Kagan
01/17/2023, 7:41 PMconfigured(
on and pass through a function that filters/does whatever to the dataframeZach P
01/17/2023, 7:42 PMJake Kagan
01/17/2023, 7:42 PMZach P
01/17/2023, 7:53 PMJake Kagan
01/17/2023, 8:30 PM@op(required_resource_keys={"rsrc_bigquery_runner"},
config_schema={'qry_str': str,
'filter_func': Field(config=str, is_required=False)
})
def bigq_adjust_df_results(context) -> pl.DataFrame:
# qry_str = context.op_config['qry_str']
qry_str = QRY_STR
filter_func = context.op_config['filter_func']
df: pl.DataFrame = context.resources.rsrc_bigquery_runner(qry_str).bigquery_to_df()
if filter_func is None:
return df
else:
filtered_df = globals()[filter_func](df)
get_dagster_logger().info(
f'ADJUSTED DATAFRAME SHAPE: {str(filtered_df.shape)} columns: {str(filtered_df.shape[1])}, rows: {str(filtered_df.shape[0])}')
return filtered_df
Field(config=str|None
but i see you need to use Noneable
Zach P
01/17/2023, 8:32 PMJake Kagan
01/17/2023, 8:32 PMZach P
01/17/2023, 8:32 PMJake Kagan
01/17/2023, 8:33 PMglobals()
is really best practices u knowZach P
01/17/2023, 8:33 PMJake Kagan
01/17/2023, 8:34 PMZach P
01/17/2023, 8:35 PMJake Kagan
01/17/2023, 8:35 PMZach P
01/17/2023, 8:37 PMJake Kagan
01/17/2023, 8:38 PMZach P
01/17/2023, 8:38 PMJake Kagan
01/17/2023, 8:38 PMbigquery resource
--> op that adjusts dataframe if needed
--> op to do other stuff
im trying to figure if this is an ok structurepl.DataFrame.filter
line, but i just figured id split it up and have it in an outside function instead of another opZach P
01/17/2023, 8:41 PMJake Kagan
01/17/2023, 8:42 PMcommon
folder, and then any job that uses it can have its own function sitting in the file if theres a needZach P
01/17/2023, 8:42 PMJake Kagan
01/17/2023, 8:43 PMdef filter_func(df):
filter_df = df.filter(pl.col("client_bill_amount") > 300)
#other stuff idk what
return filter_df
Zach P
01/17/2023, 8:43 PMJake Kagan
01/17/2023, 8:45 PMZach P
01/17/2023, 8:46 PMJake Kagan
01/17/2023, 8:46 PMZach P
01/17/2023, 8:48 PMdef my_business_logic_1(df)->df:
return df.filter(...).other_transformations()
@asset
def my_dataset(input_data):
my_business_logic_1(input_data).group_by("something that is given to polars")
Jake Kagan
01/17/2023, 8:49 PMZach P
01/17/2023, 8:50 PMJake Kagan
01/17/2023, 8:52 PMZach P
01/17/2023, 8:53 PMJake Kagan
01/17/2023, 8:59 PM