Yan
05/03/2022, 8:40 PMfrom dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os
@resource
def my_resource(context):
fd, path = tempfile.mkstemp(text=True, suffix=".txt")
with os.fdopen(fd, "w") as f:
f.write("Hello, world!")
try:
yield path
finally:
<http://context.log.info|context.log.info>("Deleting file...")
os.remove(path)
@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
assert os.path.exists(context.resources.my_resource)
with open(context.resources.my_resource) as f:
<http://context.log.info|context.log.info>(f.read())
@graph
def my_graph():
my_op()
my_job_1 = my_graph.to_job(name="my_job_1", resource_defs={"my_launcher": no_step_launcher, "my_resource": my_resource})
my_job_2 = my_graph.to_job(name="my_job_2", resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource},
config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}})
running my_job_1 works fine, but my_job_2 fails on assert (code from finally block was executed before op code)
is it a bug or am i doing something wrong?
dagster version: 0.14.13
Thanks!Zach
05/03/2022, 10:07 PMfrom dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os
from contextlib import contextmanager
@resource
def my_resource(context):
@contextmanager
def file_handler():
fd, path = tempfile.mkstemp(text=True, suffix=".txt")
with os.fdopen(fd, "w") as f:
f.write("Hello, world!")
try:
yield path
finally:
<http://context.log.info|context.log.info>("Deleting file...")
os.remove(path)
return file_handler
@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
with context.resources.my_resource() as file_path:
assert os.path.exists(file_path)
with open(file_path) as f:
<http://context.log.info|context.log.info>(f.read())
@graph
def my_graph():
my_op()
my_job_1 = my_graph.to_job(name="my_job_1", resource_defs={"my_launcher": no_step_launcher, "my_resource": my_resource})
my_job_2 = my_graph.to_job(name="my_job_2",
resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource},
config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}})
prha
05/03/2022, 10:47 PMYan
05/03/2022, 11:12 PMprha
05/03/2022, 11:33 PMinprocess_executor
. It will just make sure the finally block executes when all the steps complete.Yan
05/04/2022, 12:03 AMfrom dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os
from contextlib import contextmanager
@resource
@contextmanager
def my_resource(context):
fd, path = tempfile.mkstemp(text=True, suffix=".txt")
with os.fdopen(fd, "w") as f:
f.write("Hello, world!")
try:
<http://context.log.info|context.log.info>(f"Yielding file {path}...")
yield path
finally:
<http://context.log.info|context.log.info>(f"Deleting file {path}...")
os.remove(path)
@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
path = context.resources.my_resource
assert os.path.exists(path)
@op(ins={"parent": In(Nothing)}, required_resource_keys={"my_launcher", "my_resource"})
def my_op_2(context):
path = context.resources.my_resource
assert os.path.exists(path)
@graph
def my_graph():
my_op_2(my_op())
my_job_1 = my_graph.to_job(name="my_job_1", resource_defs={"my_launcher": no_step_launcher, "my_resource": my_resource})
my_job_2 = my_graph.to_job(name="my_job_2", resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource},
config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}})
creates/removes resource for every op if i run using inprocess
2022-05-04 03:01:56 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - RUN_START - Started execution of run for "my_job_1".
2022-05-04 03:01:56 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 65171)
2022-05-04 03:01:56 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - my_op - ENGINE_EVENT - Launching subprocess for my_op
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:57 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpzy4hk7d7.txt...
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - LOGS_CAPTURED - Started capturing logs for step: my_op.
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - STEP_START - Started execution of step "my_op".
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-05-04 03:01:57 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65176 - my_op - STEP_SUCCESS - Finished execution of step "my_op" in 14ms.
2022-05-04 03:01:57 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpzy4hk7d7.txt...
2022-05-04 03:01:58 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - my_op_2 - ENGINE_EVENT - Launching subprocess for my_op_2
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:59 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op_2 - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpwf9w62zi.txt...
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - LOGS_CAPTURED - Started capturing logs for step: my_op_2.
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - STEP_START - Started execution of step "my_op_2".
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65185 - my_op_2 - STEP_SUCCESS - Finished execution of step "my_op_2" in 14ms.
2022-05-04 03:01:59 +0300 - dagster - INFO - resource:my_resource - 2a907531-954a-4e04-a60c-5cea65c75b0c - my_op_2 - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpwf9w62zi.txt...
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 2.68s (pid: 65171)
2022-05-04 03:01:59 +0300 - dagster - DEBUG - my_job_1 - 2a907531-954a-4e04-a60c-5cea65c75b0c - 65171 - RUN_SUCCESS - Finished execution of run for "my_job_1".
and it assert still fails if i run using external executor
2022-05-04 03:02:48 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - RUN_START - Started execution of run for "my_job_2".
2022-05-04 03:02:48 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 65200)
2022-05-04 03:02:48 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - my_op - ENGINE_EVENT - Launching subprocess for my_op
2022-05-04 03:02:49 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65205 - my_op - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:49 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpqflsk560.txt...
2022-05-04 03:02:49 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65205 - my_op - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:49 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65205 - my_op - LOGS_CAPTURED - Started capturing logs for step: my_op.
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - ENGINE_EVENT - Starting initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:50 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Yielding file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpff5hf02w.txt...
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - ENGINE_EVENT - Finished initialization of resources [io_manager, my_launcher, my_resource].
2022-05-04 03:02:50 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpff5hf02w.txt...
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - STEP_START - Started execution of step "my_op".
2022-05-04 03:02:50 +0300 - dagster - ERROR - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65214 - my_op - STEP_FAILURE - Execution of step "my_op" failed.
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "my_op"::
AssertionError
Stack Trace:
File "/tmp/my_env/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/tmp/my_env/lib/python3.7/site-packages/dagster/utils/__init__.py", line 405, in iterate_with_context
next_output = next(iterator)
File "/tmp/my_env/lib/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "test.py", line 25, in my_op
assert os.path.exists(path)
2022-05-04 03:02:50 +0300 - dagster - ERROR - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - my_op_2 - Dependencies for step my_op_2 failed: ['my_op']. Not executing.
2022-05-04 03:02:50 +0300 - dagster - INFO - resource:my_resource - 731db262-3560-459f-853f-34b37bad0700 - my_op - Deleting file /var/folders/_6/d1krd57x1h55htxqv69ds6vh39cs2s/T/tmpqflsk560.txt...
2022-05-04 03:02:50 +0300 - dagster - DEBUG - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 2.51s (pid: 65200)
2022-05-04 03:02:50 +0300 - dagster - ERROR - my_job_2 - 731db262-3560-459f-853f-34b37bad0700 - 65200 - RUN_FAILURE - Execution of run for "my_job_2" failed. Steps failed: ['my_op'].
Error: Pipeline run 731db262-3560-459f-853f-34b37bad0700 resulted in failure.