Peter Pezon
07/27/2022, 4:40 AMk8s_job_op
and wondering how I can dynamically map another op's output to k8s job arguments. example crude pseudo-code:
@job
def submit_commands():
commands = gather_commands()
commands.map(k8s_job_op.configured({
"image": "busybox",
"command": ["/bin/sh", "–c "],
"args": [command],
})
daniel
07/27/2022, 3:19 PMPeter Pezon
07/27/2022, 3:54 PMdaniel
07/27/2022, 3:55 PMdaniel
07/27/2022, 3:56 PMPeter Pezon
07/27/2022, 4:02 PMdaniel
07/27/2022, 4:06 PMPeter Pezon
07/27/2022, 4:14 PMdaniel
07/27/2022, 4:28 PMdaniel
07/27/2022, 4:40 PMfrom dagster_k8s import k8s_job_op
from dagster import DynamicOut, DynamicOutput, build_op_context, job, op
@op(out=DynamicOut())
def two_outs():
yield DynamicOutput("hi", "hi")
yield DynamicOutput("bye", "bye")
@op
def my_wrapped_op(command):
with build_op_context(
config={
"image": "busybox",
"command": ["/bin/sh", "–c "],
"args": [command],
}
) as context:
k8s_job_op(context)
@job
def submit_commands():
commands = two_outs()
commands.map(my_wrapped_op)
Peter Pezon
07/27/2022, 5:29 PMPeter Pezon
07/28/2022, 1:26 AMdagster.core.errors.DagsterInvalidPropertyError: The pipeline_run property is not set on the context when a solid is directly invoked.
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 357, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 69, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py", line 174, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py", line 142, in _yield_compute_results
for event in iterate_with_context(
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
next_output = next(iterator)
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py", line 73, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "/Users/pez/PycharmProjects/dagster-crawler-pipeline/pipeline/ops/crawlers.py", line 49, in crawler_k8s_job_op
return k8s_job_op(context)
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/definitions/solid_definition.py", line 184, in __call__
return solid_invocation_result(self, context, *args[1:], **kwargs)
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/definitions/solid_invocation.py", line 61, in solid_invocation_result
compute_fn.decorated_fn(context, **input_dict)
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/utils/backcompat.py", line 234, in _inner
return callable_(*args, **kwargs)
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster_k8s/ops/k8s_job_op.py", line 104, in k8s_job_op
context.pipeline_run,
File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py", line 402, in pipeline_run
raise DagsterInvalidPropertyError(_property_msg("pipeline_run", "property"))
daniel
07/28/2022, 1:28 AMdaniel
07/28/2022, 1:29 AMPeter Pezon
07/28/2022, 1:40 AMfrom dagster import build_op_context, configured, graph, op, DynamicOut, DynamicOutput{"a": }
from dagster_k8s import k8s_job_op
@op(out={"cmd_a": DynamicOut()})
def prepare():
yield DynamicOutput("command_a", output_name="cmd_a", mapping_key=f"a_0")
yield DynamicOutput("command_a", output_name="cmd_a", mapping_key=f"a_1")
@op
def cmd_k8s_job_op(context, command: str):
with build_op_context(
config={
"image": "busybox",
"command": ["/bin/sh", "–c "],
"args": ["echo cli", "--cmd", command],
},
instance=context.instance,
) as op_context:
return k8s_job_op(op_context)
@configured(cmd_k8s_job_op)
def cmd_a(_init_context):
return {
"image": "cmd_a",
"command": ["cmd_a"],
}
@graph
def cmd_graph():
commands = prepare()
commands.cmd_a.map(cmd_a)
def make_cmd_job():
return cmd_graph.to_job(name="cmd")
daniel
07/28/2022, 2:18 AMPeter Pezon
07/28/2022, 2:18 AM