https://dagster.io/ logo
Title
a

Anaqi Afendi

10/28/2021, 3:41 PM
How can I get the output of a notebook in dagster through dagstermill? I've followed the guide and API docs on the site but I keep running into this errror:
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "dbt_job":
...

The above exception was caused by the following exception:
KeyError: ('55fafb57-29ab-4d0e-a7b2-c6a30bcdf1db', 'dbt_job', 'dbt_job_status')
  File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagster\core\execution\plan\utils.py", line 44, in solid_execution_error_boundary
    yield
  File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagster\utils\__init__.py", line 383, in iterate_with_context
    next_output = next(iterator)
  File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagstermill\solids.py", line 268, in _t_fn
    build_input_context(upstream_output=output_context)
  File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagster\core\storage\mem_io_manager.py", line 14, in load_input
    return self.values[keys]
p

prha

10/28/2021, 4:35 PM
Hi Anaqi. Can you share your job definition? You may need to specify an IO manager that is not the in-memory IO manager (we have to pass inputs across a process boundary to a dagstermill op). We have the
fs_io_manager
in the
dagster
module that could probably work here.
from dagster import fs_io_manager

@job(resource_defs={'io_manager': fs_io_manager})
def my_job():
    ...
a

Anaqi Afendi

10/28/2021, 5:14 PM
dbt_job = dm.define_dagstermill_solid(
    name="dbt_job",
    notebook_path=script_relative_path("modules/dbt_job.ipynb"),
    input_defs=[InputDefinition("network_status_complete", 
                                bool, 
                                description="Toy variable"
                                )
                ],
    output_defs=[OutputDefinition(str, 
                                  name="dbt_job_status",
                                  description="The status of the dbt job run"
                                  )
                ],
    required_resource_keys = {"fs"},
    description="A notebook that triggers a dbt_cloud job, then polls the run to check for its completion"
)
p

prha

10/28/2021, 6:05 PM
do you have an io manager specified in your pipeline’s resources? do you mind sharing your pipeline definition?
a

Anaqi Afendi

10/28/2021, 6:05 PM
@pipeline(mode_defs=[ModeDefinition(resource_defs={
    "fs": fs_io_manager, 
    "file_manager": local_file_manager, 
    "io": mem_io_manager})])
def test_pipeline():
    update_network_status = update_network_files()
    dbt_job_status = dbt_job(update_network_status)
    take_dagstermill_output(dbt_job_status)
I tried running the dagstermill solid with the "file_manager" resource key instead and got the same error
p

prha

10/28/2021, 6:20 PM
I see… can you add
"io_manager": fs_io_manager
to your resource defs?
a

Anaqi Afendi

10/28/2021, 6:21 PM
And should I use "io_manager" as the required resource?
for the dagstermill solid
Okay this worked, thats kind of wierd it had to be called "io_manager" in the key, since that name shouldn't matter unless dagstermill doesn't have a way to configure what the resource key it uses as the fs_io_manager is
p

prha

10/28/2021, 7:02 PM
yeah, “io_manager” is a special-cased resource… there’s some doc on it here: https://docs.dagster.io/concepts/io-management/io-managers#io-managers
a

Anaqi Afendi

10/28/2021, 7:03 PM
Would it be better to migrate my code if possible from ipynb and into a function call if that means it can be more flexible? I just really like being able to see the actual code that runs in each block within the DAG in dagster when its in a notebook format
p

prha

10/28/2021, 7:10 PM
It could be beneficial, yes, to split them out as individual functions. you could then make use of solid-level retries and other solid-based features
❤️ 1
a

Anaqi Afendi

10/28/2021, 7:11 PM
Awesome thanks!
p

prha

10/28/2021, 7:11 PM
I think one case where you would want to keep things in notebooks is if the notebook itself contains artifacts that are important to keep around (e.g. contains some plots, visualizations)
in that case, you probably want to use the file_manager to persist the evaluated notebook to some stored location
a

Anaqi Afendi

10/28/2021, 7:12 PM
unrelated: or kind of, Is there a way to specify dependancies for solids without using I/O? I have a bunch of solids dependant on one another to run in sequence but they are all just triggers for dbt/databricks jobs and rn I'm passing dummy variables between them
p

prha

10/28/2021, 8:18 PM
no, the main way to specify dependencies is using I/O. But we do have this concept of a
Nothing
output which allows you to specify the dependency without actually passing any data between the solids: https://docs.dagster.io/concepts/solids-pipelines/pipelines#order-based-dependencies-nothing-dependencies
@Dagster Bot issue detect when an in-memory IO manager is used for jobs using dagstermill ops and raise a better error message
d

Dagster Bot

10/28/2021, 8:20 PM