Madeline Wu
08/19/2022, 6:48 PMUserWarning: Error loading repository location ml_training_repos.py:AttributeError: 'InvokedSolidOutputHandle' object has no attribute 'items'
Here is a code snippet of how I’m structuring things.
from dagster_k8s import k8s_job_op
def training_op_wrapper(run_args: dict):
# do something that involves unpacking/iterating over `run_args`
arg_tokens = [f"{key}={value}" for key, value in run_args.items()]
config = {...}
return k8s_job_op.configured(
config,
name="run_training_op",
)
@op(
config_schema={"additional_args": dict}
)
def collect_args(context) -> dict:
return context.op_config["additional_args"]
@job
def train_triage_data():
args = collect_args()
training_op = training_op_wrapper(collect_args())
training_op()
chris
08/19/2022, 8:58 PMtraining_op_wrapper
is being used?Madeline Wu
08/19/2022, 9:39 PMchris
08/19/2022, 9:46 PMtrain_triage_data
, when you call an op within the body of a job, it's just letting dagster know the dependency information for that op, not actually running yet. So when you call training_op_wrapper
with the result from collect_args
what's being passed in some internal dagster thing that helps pass dependency information.k8s_job_op
, if I understand correctly? If so, the `configured` decorator would be a good option - essentially allows you to specify a config schema, and within the body of that function, manipulate the passed in config and pass it to a dagster entity.Madeline Wu
08/19/2022, 9:50 PMconfigured
decorator? Do you mean I should have one more op (configured) that references the output of get_train_data_op
(the k8s_job_op
)? Or do I still need collect args?
from dagster_k8s import k8s_job_op
from dagster import op, job
def generic_k8s_runner(name: str, cmd: list[str]):
"""Factory method for creating a generic k8s job op"""
config = {...}
return k8s_job_op.configured(
config,
name=name,
)
def get_train_data_op(**kwargs):
"""Factory method for creating an op that trains data in a k8s job"""
cmd = [f"{key}={value} for key, value in kwargs.items()]
return generic_k8s_runner(name="run_training_op", cmd=cmd)
@op(
config_schema={"additional_args": dict}
)
def collect_args(context) -> dict:
return context.op_config["additional_args"]
@job
def train_triage_data():
"""Job that trains Triage data. Can be run with params specified by `collect_args`"""
args = collect_args()
training_op = get_train_data_op(**collect_args())
training_op()
chris
08/19/2022, 10:33 PMconfigured
decorator if you want to be able to provide config and change what happens at runtime; if you just want to be able to create a k8s job op parameterized by different things, then what you have should be finecollect_args
here. Inside the body of generic_k8s_runner
, you just need to translate the provided cmd
into the config
that you'll be providing to the k8s_job_op
Madeline Wu
08/19/2022, 11:36 PMcollect_args
here to provide a runtime config for running train_triage_data
. I’d want the caller to be able to pass things in when running.cmd
would be coming from user values provided at runtime. Am I thinking about this correctly?chris
08/20/2022, 12:09 AMk8s_job_op
is running, at runtime. You don't need to even use configured for this; you can just, at runtime, provide the cmd
as part of the run config when running the job. So your job could literally just be:
@job
def train_triage_data():
k8s_job_op()
And then when providing run config (using execute_in_process as an example here)
train_triage_data.execute_in_process(run_config={"ops": {"k8s_job_op": {"config": {"command": ["the", "command"]}}}})
Does that make sense / fit your use case?Madeline Wu
08/20/2022, 12:36 AMk8s_job_op
. I think having the factory method is tripping me up here. I don’t want to pass all of the configs at runtime, just some of them.chris
08/20/2022, 12:40 AM@configured
would actually work nicely for you then - specify the parts of the config you want at runtime as config schema in the decorator, and then hardcode the pieces you want@op(config_schema={"a": str, "b": str})
def the_op():
...
I'd use the @configured
decorator like so:
@configured(the_op, config_schema=str)
def the_config_mapping(outer):
return {"a": "hardcoded", "b": outer}
Does that make sense?