Hello everyone :wave: As we start using dagster wi...
# deployment-kubernetes
a
Hello everyone 👋 As we start using dagster with dagster-k8s more and more, there are certain tags that repeat over and over again. I would like to have them in a shared company library. What would be the best way to approach this?
d
Do you have an example of the types of tags you're referring to? Are these dagster system tags or custom tags that you created?
a
I am referring to something like this: https://docs.dagster.io/deployment/guides/kubernetes/customizing-your-deployment#job-or-op-kubernetes-configuration In the example there is a pod_spec_config -> podaffinity set up. If i want to use the same pod affinity for all my workflows I need to repeat the same dict everywhere. I was asking myself if there was a smarter way of doing it... something like:
Copy code
@graph(
    tags=add_nfs_mount(read_only=True) | add_pod_affinity()
)
So that a developer / data scientist can build the right dictionary without having to know all the necessary annotations and tags
d
Got it - I could absolutely imagine a 'graph factory' pattern kind of like the op factory pattern here: https://docs.dagster.io/concepts/ops-jobs-graphs/ops#op-factory that wraps the graph decorator and applies certain tags to it, while still passing through the underlying graph function
a
I see the example, but in that case the op logic is inside the function, which is something that i would like to avoid. The example you posted seems perfect for identical ops that need to be created with different tags, in my case the ops/graphs and tags are variable. I guess what i would like to achieve is to "compose" the
dagster-k8s/config
dictionary without having to repeat the same dictionary. For example...
Copy code
@op(
    tags=add_node_selector(machine_type='medium) | add_nfs_mount(mountPoint='/nfs')
)
def my_op:
    pass
Or maybe i misunderstood your example?
https://github.com/dagster-io/dagster/pull/6315 Fixed a small typo in the example
d
Yeah, the example isn't a perfect fit, but I bet we could have a factory that is itself a decorator (i.e. it's a decorator function and the decorated function becomes the body of the wrapped op, just with tags automatically added). Let me see if I can pull up an example
a
Thank you. I guess something like this could work maybe? https://stackoverflow.com/questions/34443271/how-can-i-extend-a-librarys-decorator
d
yeah, I was just writing up something like that example, that kept the
func
arg and passed it through to the
op
a
Let me know if you have something working... i haven't had any luck until now
d
Ah this actually turned out pretty simple, believe this works?
Copy code
from dagster import op

def my_op(*args, **kwargs):
    tags = kwargs.get("tags", {})
    tags["foo"] = "bar"
    return op(*args, **kwargs)


@my_op()
def hello(name: str):
    print(f"Hello, {name}!")
a
What I've achieved at this time is an interface of this kind (maple is the name of the library):
Copy code
@maple_op(
    include_gfs_mount = True
    include_gfs_mount_mount_point = '/gfs'
)
def my_op():
    pass
That's the easiest solution I found. I had to fiddle around with your example but I finally got it working. Thank you @daniel 🙏 Let me know if you see any possible improvement here
d
ah nice - we might end up recommending this pattern elsewhere, would you mind sharing what kind of tweaks you ended up making?
forwarding decorators can be tricky
a
yeah... to be honest i was trying to do something like this:
Copy code
@op(
    name='mytempop'
)
@include_gfs(mount_point='/gfs')
def my_op():
    pass
But I didn't manage to get it working (not even sure it's possible)
I will try to share the code tomorrow
🙏 1
@daniel maple_op looks something like this:
Copy code
def maple_op(*args, **kwargs):
    """
    A decorator for adding extra tags to an operation.

    :param args:
    :param kwargs:
        include_gfs
        include_gfs_read_only
        include_gfs_mount_point
        include_nodeselector
        include_nodeselector_pool
    :return:
        dagster.op
    """
    tags = kwargs.get("tags", {})

    # Dict containing the extra tags to be added to the operation
    additional_tags = {
        "include_gfs": ["read_only", "mount_point"],
        "include_nodeselector": ["pool"],
    }

    for additional_tag in additional_tags.keys():
        if kwargs.get(additional_tag):
            args_prefix = additional_tag + "_"
            additional_keys = additional_tags[additional_tag]
            # Create a dictionary for every sub-tag. To be passed as an argument
            method_args = {  # noqa: F841
                key: kwargs.pop(args_prefix + key)
                for key in additional_keys
                if kwargs.get(args_prefix + key)
            }

            # Return the tags from the Merge the tags
            _dict_merge(tags, eval("_" + additional_tag + "(**method_args)"))
            kwargs.pop(additional_tag)

    kwargs["tags"] = tags
    return op(*args, **kwargs)
Then I have built separately:
Copy code
def _include_gfs(read_only=False, mount_point="/gfs"):
and
Copy code
def _include_nodeselector(pool="core"):
Let me know if you have any feedback