https://dagster.io/ logo
#ask-community
Title
# ask-community
s

Simon Szalai

04/06/2022, 7:51 PM
Hi Dagster People, I’d like to ask for advice about a solution that I am trying to implement using dagstermill. The goal would be the following: 1. Load some rows from a postgres database 2. Group them based on a column (resulting in one pandas dataframe for each unique value, which contains all the rows that has that particular value) 3. Have an op that loads a jupyter notebook (which corresponds to the name of the value used in the previous step for grouping) and use the code in it to process the dataframe 4. Save the result in another database. I got most of these working, I just have one problem. Here is the code I have so far:
Copy code
# An op lo load an group rows from the postgres db. It yields a dataframe for each unique value in the column of grouping
@op(
    name="get_postgres_rows",
    output_defs=[DynamicOutputDefinition(pd.DataFrame, name="dataframes")]
)
def get_postgres_rows(context):
    ....
    for i, df in enumerate(dfs):
        yield DynamicOutput(df, mapping_key=str(i), output_name="dataframes")

# Op factory to create an op based on the passed name. It loads the corresponding .ipynb
def run_notebook_factory(name):
    run_notebook_dm_op = dm.define_dagstermill_op(
        name,
        script_relative_path(f"../notebooks/{name}.ipynb"),
        output_notebook_name=f"{name}_output",
        input_defs=[InputDefinition(dagster_type=pd.DataFrame, name="input_df", description="")],
        output_defs=[OutputDefinition(dagster_type=pd.DataFrame, name="output_df", description="")],
    )
    return run_notebook_dm_op

# Function for processing a dataframe
@op()
def process_df(df):
    activity_short_name = df["activity_short_name"][0]
    run_notebook = run_notebook_factory(activity_short_name)
    processed_df, _ = run_notebook(df) # ERROR COMES FROM HERE
    return processed_df

@op()
def join_dataframes(dfs):
    return pd.concat(dfs, axis=0)


@job(
    resource_defs={
        "output_notebook_io_manager": dm.local_output_notebook_io_manager,
    }
)
def process_data_with_notebook_job():
    # Get dataframes (one for each activity) from responses database as DynamicOutput
    dfs = get_postgres_rows()

    # Process each dataframe using the notebook corresponding to its short_name
    processed_dfs = dfs.map(process_df)

    # Concatenate processed dataframes
    df_merged = join_dataframes(processed_dfs.collect())

    # Save results to results database
    save_results(df_merged)
I am using a factory pattern to create an op, based on a value passed to it (this corresponds to the notebook’s filename), then call it directly in the process_df op. When doing this, I am getting the following error:
Copy code
dagster.core.errors.DagsterInvalidInvocationError: Attemped to invoke op that was not constructed using the `@op` decorator. Only ops constructed using the `@op` decorator can be directly invoked.
I assume the reason for this error is that I directly call an op from another op, which is probably not allowed. Probably I could replace
process_df
directly with `run_notebook`(the dagstermill op), but then I won’t be able to use a different notebook based on different row values. Is there any way this problem can be solved? Or an alternative solution, workaround that would achieve similar results? Thanks a lot for your help!
c

claire

04/07/2022, 1:32 AM
Hi Simon, this is a current limitation with the dagstermill op. It seems like what would be ideal here is to be able to provide config to the dagstermill op that would provide the notebook path, but this is currently not possible. I can file a feature request so we can take this on in the future
@Dagster Bot issue define_dagstermill_op provide notebook path in config
d

Dagster Bot

04/07/2022, 1:34 AM
c

claire

04/07/2022, 1:36 AM
In the meantime, one possible workaround would be to create a job factory that outputs a JobDefinition.
get_postgres_rows
would contain an
Out
that would be inputted to every possible dagstermill op. Then,
join_dataframes
would read from every dagstermill op.
s

Simon Szalai

04/07/2022, 1:37 AM
hey, thanks, that would be great. Actually in the meantime I found a workaround. Basically I pre-create dagstermill ops for each notebook (got the list of files with os.walk), like this:
Copy code
activity_processors = {}
for activity_name in activity_names:
    activity_processor = dm.define_dagstermill_op(
        activity_name,
        script_relative_path(f"../notebooks/{activity_name}.ipynb"),
        output_notebook_name=f"{activity_name}_output",
        input_defs=[InputDefinition(dagster_type=pd.DataFrame, name="input_df", description="")],
        output_defs=[OutputDefinition(dagster_type=pd.DataFrame, name="output_df", description="")],
    )
    activity_processors[activity_name] = activity_processor
Then chain all of them together, like this:
Copy code
dfs = get_postgres_rows()

    processed_dfs = []
    for i, activity_processor_key in enumerate(activity_processors):
        activity_processor = activity_processors[activity_processor_key]
        processed_df, _ = activity_processor(dfs[i])
        processed_dfs.append(processed_df)
I used branching to make sure that only the ones execute that are needed. Thanks a lot for creating the ticket, it will be useful for sure!
c

claire

04/07/2022, 1:38 AM
Got it, yep, hopefully we can add this feature soon!
2 Views