Tom Dakin
04/17/2023, 9:04 AM""" hello.py """
from dagster import op, job
@op(config_schema={"name": str})
def hello(context):
name = context.op_config["name"]
print(f"Hello {name}")
return name
@job
def hello_job():
hello()
And config that looks like this
# config.yaml
ops:
hello:
config:
name: Tom
outputs:
- result:
json:
path: output.json
When I run this using D*agster 1.0.9* I get the following result
(.venv) ➜ scratch dagster job execute -f hello.py -a hello_job -c config.yaml
2023-04-17 09:56:31 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89861 - RUN_START - Started execution of run for "hello_job".
2023-04-17 09:56:31 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89861 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 89861)
2023-04-17 09:56:31 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89861 - hello - STEP_WORKER_STARTING - Launching subprocess for "hello".
2023-04-17 09:56:32 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - STEP_WORKER_STARTED - Executing step "hello" in subprocess.
2023-04-17 09:56:32 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - LOGS_CAPTURED - Started capturing logs for step: hello.
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - STEP_START - Started execution of step "hello".
Hello Tom
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - ASSET_MATERIALIZATION - Materialized value output json.
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89867 - hello - STEP_SUCCESS - Finished execution of step "hello" in 95ms.
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89861 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 2.02s (pid: 89861)
2023-04-17 09:56:33 +0100 - dagster - DEBUG - hello_job - fb086477-6074-427f-8016-28e1670dba04 - 89861 - RUN_SUCCESS - Finished execution of run for "hello_job".
Running with Dagster 1.2.2, it errors because of the outputs
config block:
(.venv) ➜ scratch dagster job execute -f hello.py -a hello_job -c config.yaml
Traceback (most recent call last):
File "/Users/tomdakin/Desktop/scratch/.venv/bin/dagster", line 8, in <module>
sys.exit(main())
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_cli/__init__.py", line 46, in main
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_cli/job.py", line 306, in job_execute_command
execute_execute_command(instance, kwargs)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/telemetry.py", line 164, in wrap
result = f(*args, **kwargs)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_cli/job.py", line 330, in execute_execute_command
result = do_execute_command(pipeline, instance, config, mode, tags, solid_selection, preset)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_cli/job.py", line 403, in do_execute_command
return execute_pipeline(
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 632, in execute_pipeline
return _logged_execute_pipeline(
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/telemetry.py", line 164, in wrap
result = f(*args, **kwargs)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 678, in _logged_execute_pipeline
dagster_run = instance.create_run_for_pipeline(
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 982, in create_run_for_pipeline
execution_plan = create_execution_plan(
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 958, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
File "/Users/tomdakin/Desktop/scratch/.venv/lib/python3.10/site-packages/dagster/_core/system_config/objects.py", line 167, in build
raise DagsterInvalidConfigError(
dagster._core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Received unexpected config entry "outputs" at path root:ops:hello. Expected: "{ config: { name: String } }".
Is this functionality deprecated? If so, when did that happen and what is the recommended migration? I can’t seem to find reference to it in the docs or changelog. Any advice would be gratefully received, thank you!