Simon Szalai
04/06/2022, 7:51 PM# An op lo load an group rows from the postgres db. It yields a dataframe for each unique value in the column of grouping
@op(
name="get_postgres_rows",
output_defs=[DynamicOutputDefinition(pd.DataFrame, name="dataframes")]
)
def get_postgres_rows(context):
....
for i, df in enumerate(dfs):
yield DynamicOutput(df, mapping_key=str(i), output_name="dataframes")
# Op factory to create an op based on the passed name. It loads the corresponding .ipynb
def run_notebook_factory(name):
run_notebook_dm_op = dm.define_dagstermill_op(
name,
script_relative_path(f"../notebooks/{name}.ipynb"),
output_notebook_name=f"{name}_output",
input_defs=[InputDefinition(dagster_type=pd.DataFrame, name="input_df", description="")],
output_defs=[OutputDefinition(dagster_type=pd.DataFrame, name="output_df", description="")],
)
return run_notebook_dm_op
# Function for processing a dataframe
@op()
def process_df(df):
activity_short_name = df["activity_short_name"][0]
run_notebook = run_notebook_factory(activity_short_name)
processed_df, _ = run_notebook(df) # ERROR COMES FROM HERE
return processed_df
@op()
def join_dataframes(dfs):
return pd.concat(dfs, axis=0)
@job(
resource_defs={
"output_notebook_io_manager": dm.local_output_notebook_io_manager,
}
)
def process_data_with_notebook_job():
# Get dataframes (one for each activity) from responses database as DynamicOutput
dfs = get_postgres_rows()
# Process each dataframe using the notebook corresponding to its short_name
processed_dfs = dfs.map(process_df)
# Concatenate processed dataframes
df_merged = join_dataframes(processed_dfs.collect())
# Save results to results database
save_results(df_merged)
I am using a factory pattern to create an op, based on a value passed to it (this corresponds to the notebook’s filename), then call it directly in the process_df op. When doing this, I am getting the following error:
dagster.core.errors.DagsterInvalidInvocationError: Attemped to invoke op that was not constructed using the `@op` decorator. Only ops constructed using the `@op` decorator can be directly invoked.
I assume the reason for this error is that I directly call an op from another op, which is probably not allowed. Probably I could replace process_df
directly with `run_notebook`(the dagstermill op), but then I won’t be able to use a different notebook based on different row values.
Is there any way this problem can be solved? Or an alternative solution, workaround that would achieve similar results?
Thanks a lot for your help!claire
04/07/2022, 1:32 AMDagster Bot
04/07/2022, 1:34 AMclaire
04/07/2022, 1:36 AMget_postgres_rows
would contain an Out
that would be inputted to every possible dagstermill op. Then, join_dataframes
would read from every dagstermill op.Simon Szalai
04/07/2022, 1:37 AMactivity_processors = {}
for activity_name in activity_names:
activity_processor = dm.define_dagstermill_op(
activity_name,
script_relative_path(f"../notebooks/{activity_name}.ipynb"),
output_notebook_name=f"{activity_name}_output",
input_defs=[InputDefinition(dagster_type=pd.DataFrame, name="input_df", description="")],
output_defs=[OutputDefinition(dagster_type=pd.DataFrame, name="output_df", description="")],
)
activity_processors[activity_name] = activity_processor
Then chain all of them together, like this:
dfs = get_postgres_rows()
processed_dfs = []
for i, activity_processor_key in enumerate(activity_processors):
activity_processor = activity_processors[activity_processor_key]
processed_df, _ = activity_processor(dfs[i])
processed_dfs.append(processed_df)
I used branching to make sure that only the ones execute that are needed.
Thanks a lot for creating the ticket, it will be useful for sure!claire
04/07/2022, 1:38 AM