I'm trying out the `k8s_job_op` and wondering how ...
# ask-community
p
I'm trying out the
k8s_job_op
and wondering how I can dynamically map another op's output to k8s job arguments. example crude pseudo-code:
Copy code
@job
def submit_commands():
  commands = gather_commands()
  commands.map(k8s_job_op.configured({
    "image": "busybox",
    "command": ["/bin/sh", "–c "],
    "args": [command],
  })
d
Hey Peter - i think for this to work there would need to be a version of the k8s_job_op that took in the args as an input to the op function, rather than config on the op - inputs can be set during the job execution, but config is set before the run happens
p
Right. Any guidance on how I achieve this? My approach will either be to (1) construct the DAG based on config files to inject op config to overwrite command/args; (2) copy and paste the contents of k8s_job_op to accept command/args as parameters. I'd like to attempt (2) by re-using existing k8s_job_op, but that doesn't seem immediately possible from my vantage point.
d
My bias would be (1) unless you think that this configuration truly needs to change at runtime without reloading the code
i would need to understand your use case a bit more to fully understand - but there are definitely some benefits to doing this configuraiton at job generation time (for example, the structure will be visible in dagit)
p
Config files in this case essentially a list of commands. They're defined in an external system and may be updated more frequently than the pipeline code. Would prefer to reduce amount of dependencies (e.g., refreshing repository whenever config changes).
d
I think you may need to fork the op in order to support (2), yeah - I'd have to think about if this use case is likely to be common enough to be worth including it in the 'blessed' op
p
sure. the use case here is sending parameters to external applications running in a container (vs inline python code); the parameters are generated upstream in the DAG. if this were in-line code, it would be pretty straight forward to pass an op argument. Similar to the dynamic mapping example pattern of walking through a directory and processing files; in this case, processing files is a command to a CLI running on k8s to act on a file. Command args needs knowledge of that file.
d
Oh, there may be a solution that allows to wrap the op, one sec
since you can invoke ops as functions, I think something like this would work to do the wrapping within another op without needing to rewrite everything- let me just check with the rest of the team that there aren't any gotchas there though:
Copy code
from dagster_k8s import k8s_job_op

from dagster import DynamicOut, DynamicOutput, build_op_context, job, op


@op(out=DynamicOut())
def two_outs():
    yield DynamicOutput("hi", "hi")
    yield DynamicOutput("bye", "bye")


@op
def my_wrapped_op(command):
    with build_op_context(
        config={
            "image": "busybox",
            "command": ["/bin/sh", "–c "],
            "args": [command],
        }
    ) as context:
        k8s_job_op(context)


@job
def submit_commands():
    commands = two_outs()
    commands.map(my_wrapped_op)
p
Thanks @daniel I'll give this a shot. I tried something similar (but without wrapping k8s_job_op around build_op_context) and that attempt resulted in an error. I'll try this.
👍 1
Update: this doesn't quite work. Returns an error:
Copy code
dagster.core.errors.DagsterInvalidPropertyError: The pipeline_run property is not set on the context when a solid is directly invoked.
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 357, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 69, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py", line 174, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py", line 142, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
    next_output = next(iterator)
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py", line 73, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "/Users/pez/PycharmProjects/dagster-crawler-pipeline/pipeline/ops/crawlers.py", line 49, in crawler_k8s_job_op
    return k8s_job_op(context)
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/definitions/solid_definition.py", line 184, in __call__
    return solid_invocation_result(self, context, *args[1:], **kwargs)
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/definitions/solid_invocation.py", line 61, in solid_invocation_result
    compute_fn.decorated_fn(context, **input_dict)
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/utils/backcompat.py", line 234, in _inner
    return callable_(*args, **kwargs)
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster_k8s/ops/k8s_job_op.py", line 104, in k8s_job_op
    context.pipeline_run,
  File "/Users/pez/miniforge3/envs/dagster-crawler-pipeline/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py", line 402, in pipeline_run
    raise DagsterInvalidPropertyError(_property_msg("pipeline_run", "property"))
d
Could you post the code that you ran so that I can try to reproduce?
Oh wait, I have an idea, one sec
p
Basically:
Copy code
from dagster import build_op_context, configured, graph, op, DynamicOut, DynamicOutput{"a": }
from dagster_k8s import k8s_job_op


@op(out={"cmd_a": DynamicOut()})
def prepare():
    yield DynamicOutput("command_a", output_name="cmd_a", mapping_key=f"a_0")
    yield DynamicOutput("command_a", output_name="cmd_a", mapping_key=f"a_1")



@op
def cmd_k8s_job_op(context, command: str):
    with build_op_context(
        config={
            "image": "busybox",
            "command": ["/bin/sh", "–c "],
            "args": ["echo cli", "--cmd", command],
        },
        instance=context.instance,
    ) as op_context:
        return k8s_job_op(op_context)


@configured(cmd_k8s_job_op)
def cmd_a(_init_context):
    return {
        "image": "cmd_a",
        "command": ["cmd_a"],
    }


@graph
def cmd_graph():
    commands = prepare()
    commands.cmd_a.map(cmd_a)


def make_cmd_job():
    return cmd_graph.to_job(name="cmd")
d
OK, apologies - there are some subtleties with directly calling ops as functions that I missed when I originally recommended that. I'll check with the team but the best bet for now may be to use a fork of k8s_job_op 😕
p
sure, thanks.