Hello team, I’m looking for some advice on how to ...
# ask-community
m
Hello team, I’m looking for some advice on how to structure my ops/op factory methods to be able to support user inputs. The high level of what I’m trying to do is: Have a Dagster job that runs a k8s job op. I’m including a config schema so that the user can pass in additional arguments to run the k8s job with (essentially, parametrizing). I’m running into this error when trying to pass a dict output from an op into my factory function.
UserWarning: Error loading repository location ml_training_repos.py:AttributeError: 'InvokedSolidOutputHandle' object has no attribute 'items'
Here is a code snippet of how I’m structuring things.
Copy code
from dagster_k8s import k8s_job_op

def training_op_wrapper(run_args: dict):
    # do something that involves unpacking/iterating over `run_args`
    arg_tokens = [f"{key}={value}" for key, value in run_args.items()]

    config = {...}
    return k8s_job_op.configured(
        config,
        name="run_training_op",
    )


@op(
    config_schema={"additional_args": dict}
)
def collect_args(context) -> dict:
    return context.op_config["additional_args"]


@job
def train_triage_data():
    args = collect_args()
    training_op = training_op_wrapper(collect_args())
    training_op()
❤️ 4
c
mind posting a snippet of how the
training_op_wrapper
is being used?
m
@chris sorry just updated my snippet, had a typo
c
ah ok so this is a misunderstanding regarding our graph dsl: within the body of
train_triage_data
, when you call an op within the body of a job, it's just letting dagster know the dependency information for that op, not actually running yet. So when you call
training_op_wrapper
with the result from
collect_args
what's being passed in some internal dagster thing that helps pass dependency information.
As far as your use case goes, you want to take in one config dictionary, manipulate it, and then pass that config to
k8s_job_op
, if I understand correctly? If so, the `configured` decorator would be a good option - essentially allows you to specify a config schema, and within the body of that function, manipulate the passed in config and pass it to a dagster entity.
m
Thank you! I will take a look at this and let you know if that worked.
Ok actually my structure looks more like this. Do you have pointers on what I should be passing into the
configured
decorator? Do you mean I should have one more op (configured) that references the output of
get_train_data_op
(the
k8s_job_op
)? Or do I still need collect args?
Copy code
from dagster_k8s import k8s_job_op
from dagster import op, job

def generic_k8s_runner(name: str, cmd: list[str]):
    """Factory method for creating a generic k8s job op"""
    config = {...}
    return k8s_job_op.configured(
        config,
        name=name,
    )

def get_train_data_op(**kwargs):
    """Factory method for creating an op that trains data in a k8s job"""
    cmd = [f"{key}={value} for key, value in kwargs.items()]
    return generic_k8s_runner(name="run_training_op", cmd=cmd)


@op(
    config_schema={"additional_args": dict}
)
def collect_args(context) -> dict:
    return context.op_config["additional_args"]


@job
def train_triage_data():
    """Job that trains Triage data. Can be run with params specified by `collect_args`"""
    args = collect_args()
    training_op = get_train_data_op(**collect_args())
    training_op()
c
You only need the
configured
decorator if you want to be able to provide config and change what happens at runtime; if you just want to be able to create a k8s job op parameterized by different things, then what you have should be fine
If I understand correctly, I don't think you need
collect_args
here. Inside the body of
generic_k8s_runner
, you just need to translate the provided
cmd
into the
config
that you'll be providing to the
k8s_job_op
m
Hmm,
I’m using the
collect_args
here to provide a runtime config for running
train_triage_data
. I’d want the caller to be able to pass things in when running.
the contents of
cmd
would be coming from user values provided at runtime. Am I thinking about this correctly?
c
ah ok i see the edit you made. It seems like you essentially just want to pass the command that
k8s_job_op
is running, at runtime. You don't need to even use configured for this; you can just, at runtime, provide the
cmd
as part of the run config when running the job. So your job could literally just be:
Copy code
@job
def train_triage_data():
    k8s_job_op()
And then when providing run config (using execute_in_process as an example here)
Copy code
train_triage_data.execute_in_process(run_config={"ops": {"k8s_job_op": {"config": {"command": ["the", "command"]}}}})
Does that make sense / fit your use case?
m
I see, this makes more sense. I want the op I’m invoking in train_triage_data to be partially configured at runtime and partially defined in the code. Was going to achieve this by having the factory method for returning the
k8s_job_op
. I think having the factory method is tripping me up here. I don’t want to pass all of the configs at runtime, just some of them.
c
I see - so
@configured
would actually work nicely for you then - specify the parts of the config you want at runtime as config schema in the decorator, and then hardcode the pieces you want
for example, let's say my op has two config fields, and I want to define one at runtime:
Copy code
@op(config_schema={"a": str, "b": str})
def the_op():
     ...
I'd use the
@configured
decorator like so:
Copy code
@configured(the_op, config_schema=str)
def the_config_mapping(outer):
    return {"a": "hardcoded", "b": outer}
Does that make sense?