https://dagster.io/ logo
Title
b

Brian Pohl

11/14/2022, 7:25 PM
Can you configure a RetryPolicy for a
k8s_job_op
? I declare my op like this:
def function_to_create_k8s_op( <a lot of variables > ):

    op = k8s_job_op.configured(
        config_or_config_fn = {
            'image': f"{image_url}:{tag}",
            'service_account_name': env['SERVICE_ACCOUNT'],
            'command': ['java'],  
            'args': args,
            'resources': {
                'requests': {
                    'cpu': cpu,
                    'memory': memory,
                },
            },
            'image_pull_policy': 'Always',
            'env_vars': all_env_vars
        },
        name = name,
    )

    return op
I tried
op.retry_policy = RetryPolicy(...
but that attribute is read-only. My understanding is that
k8s_job_ops.configured(config_schema
should only be used to set the config schema of the op, which is already defined here. I have also tried setting other attributes by passing them into
job_spec_config
, but I found that those were being ignored, and the only way to get attributes set was to pass them into
config_or_config_fn
(which seems like a a bug, but because I'm using an older version - 1.0.2 - i'm not focusing on that). So is there any opening to insert a retry policy? Or would the best option be trying to upgrade to the newest version and then using
job_spec_config
? If so, I also could use a pointer to the correct K8s spec argument for retries, because in my last run through the docs I didn't see any.
d

daniel

11/14/2022, 7:51 PM
Hey Brian - I think we're going to pull out the implemention of k8s_job_op into its own function that can be called from other ops to support this and some other requests that have come recently (like using an input from a different op to configure the resulting kubernetes job). We're sorting out what that will look like here: https://github.com/dagster-io/dagster/pull/10516
❤️ 2
:rainbow-daggy: 1
🙏 1
b

Brian Pohl

11/14/2022, 7:57 PM
that's awesome, thank you! i have also been struggling quite a bit with that exact other case - passing outputs from one op into my k8s op - so this will be huge for me.
@daniel i see that your PR got merged. when do you think it will be released into a new version of dagster_k8s that i could install?
d

daniel

11/15/2022, 8:22 PM
Later this week - usually Thursday
b

Brian Pohl

11/15/2022, 8:23 PM
that's great! thanks as always 🙏
:condagster: 1
s

Simon

11/15/2022, 11:10 PM
@daniel noticed your PR is already merged, so figured it'd be better to reply here first. What's the reason Dagster is using Kubernetes Jobs instead of Pods?/Did you consider just using Pods instead of Kubernetes Jobs? We've been running bare Pods from Airflow for years now and from our experience it makes much more sense than using Kubernetes Jobs. This is mainly because with Kubernetes Jobs you end up with another scheduling layer in between your actual scheduler (Dagster or Airflow in our case) and the work you're scheduling. Kubernetes Jobs pretty much don't make anything better for this use-case, but do introduce a bunch of issues/additional work (like getting the Pod logs and you now have two places that for different reasons can decide to do retries)
d

daniel

11/15/2022, 11:16 PM
That's a great question actually - let me check with some other members of the team on that
👍 1
I asked around and I think you're right that any benefits are minor and likely outweighed by the disadvantages. Thanks for bringing that up, I filed an issue here to track investigating moving to bare Pods: https://github.com/dagster-io/dagster/issues/10545
s

Simon

11/17/2022, 4:49 PM
@daniel 👍 My team will probably have a look at this in the coming weeks, depending on how that goes we might be able to create a PR for this as well
d

daniel

11/17/2022, 4:50 PM
Doing it for that op (k8s_pod_op) sounds very tractable and would be much appreciated - in the fullness of time we may want to update the default run launcher and executor as well, which would be a bit hairier
👍 1