https://dagster.io/ logo
#dagster-support
Title
# dagster-support
y

Yan

05/03/2022, 8:40 PM
hi team i have an issue with resource generator (try-finally) when i run it using steplauncher my code is:
Copy code
from dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os

@resource
def my_resource(context):
   fd, path = tempfile.mkstemp(text=True, suffix=".txt")
   with os.fdopen(fd, "w") as f:
      f.write("Hello, world!")
   try:
      yield path
   finally:
      <http://context.log.info|context.log.info>("Deleting file...")
      os.remove(path)
      

@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
   assert os.path.exists(context.resources.my_resource)
   with open(context.resources.my_resource) as f:
        <http://context.log.info|context.log.info>(f.read()) 

@graph
def my_graph():
    my_op()

my_job_1 = my_graph.to_job(name="my_job_1", resource_defs={"my_launcher": no_step_launcher, "my_resource": my_resource})
my_job_2 = my_graph.to_job(name="my_job_2", resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource}, 
                           config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}})
running my_job_1 works fine, but my_job_2 fails on assert (code from finally block was executed before op code) is it a bug or am i doing something wrong? dagster version: 0.14.13 Thanks!
z

Zach

05/03/2022, 10:07 PM
I think the code from the finally block is being executed before the op code because resources get initialized prior to the op code being executed, and I'm guessing they're not handled as generators so initialization executes the finally block as well as the yield. You could try returning a proper context manager as your resource and that might work? Something like
Copy code
from dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os

from contextlib import contextmanager

@resource
def my_resource(context):
    @contextmanager
    def file_handler():
        fd, path = tempfile.mkstemp(text=True, suffix=".txt")
        with os.fdopen(fd, "w") as f:
            f.write("Hello, world!")
        try:
            yield path
        finally:
            <http://context.log.info|context.log.info>("Deleting file...")
            os.remove(path)
    return file_handler


@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
    with context.resources.my_resource() as file_path:
        assert os.path.exists(file_path)
        with open(file_path) as f:
            <http://context.log.info|context.log.info>(f.read())


@graph
def my_graph():
    my_op()


my_job_1 = my_graph.to_job(name="my_job_1", resource_defs={"my_launcher": no_step_launcher, "my_resource": my_resource})
my_job_2 = my_graph.to_job(name="my_job_2",
                           resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource},
                           config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}})
p

prha

05/03/2022, 10:47 PM
Yep, this sounds right. Here’s a doc section that you might find helpful: https://docs.dagster.io/concepts/resources#context-manager-resources
y

Yan

05/03/2022, 11:12 PM
@prha @Zach code works fine, but i want to have global resource (like spark session) - not to create/destroy for every step if i run it in inprocess executor like here https://dagster.slack.com/archives/CCCR6P2UR/p1615220026408400?thread_ts=1615203338.399800&amp;cid=CCCR6P2UR but to make it work also with external launcher
p

prha

05/03/2022, 11:33 PM
Hi Yan. I’m pretty sure that even when using a contextmanager, we will only instantiate one resource instance while using the
inprocess_executor
. It will just make sure the finally block executes when all the steps complete.
y

Yan

05/04/2022, 12:03 AM
but for me
Copy code
from dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os
from contextlib import contextmanager

@resource
@contextmanager
def my_resource(context):
   fd, path = tempfile.mkstemp(text=True, suffix=".txt")
   with os.fdopen(fd, "w") as f:
      f.write("Hello, world!")
   try:
      <http://context.log.info|context.log.info>(f"Yielding file {path}...")
      yield path
   finally:
      <http://context.log.info|context.log.info>(f"Deleting file {path}...")
      os.remove(path)
      

@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
    path = context.resources.my_resource
    assert os.path.exists(path)

@op(ins={"parent": In(Nothing)}, required_resource_keys={"my_launcher", "my_resource"})
def my_op_2(context):
    path = context.resources.my_resource
    assert os.path.exists(path)

@graph
def my_graph():
    my_op_2(my_op())

my_job_1 = my_graph.to_job(name="my_job_1", resource_defs={"my_launcher": no_step_launcher, "my_resource": my_resource})
my_job_2 = my_graph.to_job(name="my_job_2", resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource}, 
                           config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}})
creates/removes resource for every op if i run using inprocess
Copy code
2022-05-04 03:01:56 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - RUN_START - Started execution of run for "my_job_1".
2022-05-04 03:01:56 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 65171)
2022-05-04 03:01:56 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - my_op - ENGINE_EVENT - Launching subprocess for my_op
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:57 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpzy4hk7d7.txt...
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - LOGS_CAPTURED - Started capturing logs for step: my_op.
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - STEP_START - Started execution of step "my_op".
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - STEP_SUCCESS - Finished execution of step "my_op" in 14ms.
2022-05-04 03:01:57 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpzy4hk7d7.txt...
2022-05-04 03:01:58 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - my_op_2 - ENGINE_EVENT - Launching subprocess for my_op_2
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:59 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op_2 - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpwf9w62zi.txt...
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - LOGS_CAPTURED - Started capturing logs for step: my_op_2.
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - STEP_START - Started execution of step "my_op_2".
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - STEP_SUCCESS - Finished execution of step "my_op_2" in 14ms.
2022-05-04 03:01:59 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op_2 - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpwf9w62zi.txt...
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 2.68s (pid: 65171)
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - RUN_SUCCESS - Finished execution of run for "my_job_1".
and it assert still fails if i run using external executor
Copy code
2022-05-04 03:02:48 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - RUN_START - Started execution of run for "my_job_2".
2022-05-04 03:02:48 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 65200)
2022-05-04 03:02:48 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - my_op - ENGINE_EVENT - Launching subprocess for my_op
2022-05-04 03:02:49 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65205 - my_op - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:49 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpqflsk560.txt...
2022-05-04 03:02:49 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65205 - my_op - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:49 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65205 - my_op - LOGS_CAPTURED - Started capturing logs for step: my_op.
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:50 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpff5hf02w.txt...
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:50 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpff5hf02w.txt...
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - STEP_START - Started execution of step "my_op".
2022-05-04 03:02:50 +0300 - dagster - ERROR - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - STEP_FAILURE - Execution of step "my_op" failed.

dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "my_op"::

AssertionError

Stack Trace:
  File "/tmp/my_env/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/tmp/my_env/lib/python3.7/site-packages/dagster/utils/__init__.py", line 405, in iterate_with_context
    next_output = next(iterator)
  File "/tmp/my_env/lib/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "test.py", line 25, in my_op
    assert os.path.exists(path)

2022-05-04 03:02:50 +0300 - dagster - ERROR - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - my_op_2 - Dependencies for step my_op_2 failed: ['my_op']. Not executing.
2022-05-04 03:02:50 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpqflsk560.txt...
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 2.51s (pid: 65200)
2022-05-04 03:02:50 +0300 - dagster - ERROR - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - RUN_FAILURE - Execution of run for "my_job_2" failed. Steps failed: ['my_op'].
Error: Pipeline run 731db262-3560-459f-853f-34b37bad0700 resulted in failure.
8 Views