https://dagster.io/ logo
#ask-community
Title
# ask-community
y

Yan Golant

07/14/2022, 3:00 PM
hi team i asked a while ago, but still struggling to make contextmanager resource work with steplauncher can you assist me with fixing the code below? tnx a lot!
Copy code
from dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os
from dagster import DagsterInstance, check, execute_pipeline, reconstructable
from contextlib import contextmanager
from dagster.cli.utils import get_instance_for_service

@resource
@contextmanager
def my_resource(context):
    fd, path = tempfile.mkstemp(text=True, suffix=".txt")
    with os.fdopen(fd, "w") as f:
        f.write("Hello, world!")
    try:
        <http://context.log.info|context.log.info>(f"Yielding file {path}...")
        yield path
    finally:
        <http://context.log.info|context.log.info>(f"Deleting file {path}...")
        os.remove(path)

@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
    assert os.path.exists(context.resources.my_resource)
    with open(context.resources.my_resource) as f:
        <http://context.log.info|context.log.info>(f.read())

@graph
def my_graph():
    my_op()


def define_my_job():
    return my_graph.to_job(name="my_job",
                           config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}},
                           resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource},
                           
)

if __name__ == "__main__":
    with get_instance_for_service("test") as instance:
        execute_pipeline(reconstructable(define_my_job), instance=instance)
y

yuhan

07/14/2022, 8:42 PM
what’s the error you’re hitting?
y

Yan Golant

07/14/2022, 8:55 PM
hi @yuhan my problem is that resource cleanup happens before op starts and i get assert in my_op (file doesnt exist as it was deleted in finally)
Copy code
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "my_op"::

AssertionError

Stack Trace:
  File "/private/tmp/test123/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/private/tmp/test123/lib/python3.7/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
    next_output = next(iterator)
  File "/private/tmp/test123/lib/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 66, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "/tmp/test.py", line 25, in my_op
    assert os.path.exists(context.resources.my_resource)
y

yuhan

07/14/2022, 8:59 PM
cc @chris does this error ring any bells? iirc you were looking at the resource teardown a bit