Jake Kagan
01/22/2023, 7:21 PM@op(required_resource_keys={"rsrc_bigquery_runner"},
config_schema={'qry_str': str,
'filter_func': Field(Any, default_value=None)} # this function is meant to allow further play on the dataframe
)
def op_bigq_adjust_df_results(context: OpExecutionContext) -> pl.DataFrame:
qry_str = context.op_config['qry_str']
filter_func = context.op_config['filter_func']
df: pl.DataFrame = context.resources.rsrc_bigquery_runner(qry_str).bigquery_to_df()
if filter_func is None:
return df
else:
filtered_df = filter_func(df)
get_dagster_logger().info(
f'ADJUSTED DATAFRAME SHAPE: {str(filtered_df.shape)} columns: {str(filtered_df.shape[1])}, rows: {str(filtered_df.shape[0])}')
return filtered_df
then i want to be able to use this op in it's configured form either at the top of a job (no dependency) or somewhere else (with dependency of other ops)
so here would be initial configuration of the op - which leaves it open for further configuration:
@configured(configurable=op_bigq_adjust_df_results, config_schema={"query": str, "placeholder": str})
def checking_configured(context, SOME_UPSTREAM_OP): # SOME_UPSTREAM_OP would provide me with a replacement string for dynamic sql
replacement = SOME_UPSTREAM_OP
query = context.op_config['query'].replace(
placeholder=context.op_config['placeholder'],
replacement=replacement)
return {"qry_str": query, "qry_type": "DQL"} # this would then be passed to the above base op (op_bigq_adjust_df_results)
and then i would have a third op where i pass the actual values to the above configured op:
x = configured(configurable=checking_configured, name='x')(
{"query": QRY_BIGQ,
"placeholder": '$placeholder$',
}
)
i already have a bigquery resource, which has all sorts of methods - but i'd like to see stuff on the dagit chart. and id like to turn some configured ops into assets
THANK YOU!chris
01/27/2023, 5:39 PM