Yan Golant
07/14/2022, 3:00 PMfrom dagster import Array, Field, Int, Noneable, String, Dict, Nothing, Permissive, op, In, Out, graph, resource
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster.core.execution.plan.external_step import local_external_step_launcher
import tempfile
import os
from dagster import DagsterInstance, check, execute_pipeline, reconstructable
from contextlib import contextmanager
from dagster.cli.utils import get_instance_for_service
@resource
@contextmanager
def my_resource(context):
fd, path = tempfile.mkstemp(text=True, suffix=".txt")
with os.fdopen(fd, "w") as f:
f.write("Hello, world!")
try:
<http://context.log.info|context.log.info>(f"Yielding file {path}...")
yield path
finally:
<http://context.log.info|context.log.info>(f"Deleting file {path}...")
os.remove(path)
@op(required_resource_keys={"my_launcher", "my_resource"})
def my_op(context):
assert os.path.exists(context.resources.my_resource)
with open(context.resources.my_resource) as f:
<http://context.log.info|context.log.info>(f.read())
@graph
def my_graph():
my_op()
def define_my_job():
return my_graph.to_job(name="my_job",
config={"resources": {"my_launcher": {"config": {"scratch_dir": "/tmp/my_launcher"}}}},
resource_defs={"my_launcher": local_external_step_launcher, "my_resource": my_resource},
)
if __name__ == "__main__":
with get_instance_for_service("test") as instance:
execute_pipeline(reconstructable(define_my_job), instance=instance)
yuhan
07/14/2022, 8:42 PMYan Golant
07/14/2022, 8:55 PMdagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "my_op"::
AssertionError
Stack Trace:
File "/private/tmp/test123/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/private/tmp/test123/lib/python3.7/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
next_output = next(iterator)
File "/private/tmp/test123/lib/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 66, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "/tmp/test.py", line 25, in my_op
assert os.path.exists(context.resources.my_resource)
yuhan
07/14/2022, 8:59 PM