Hello everyone 👋 As we start using dagster with dagster-k8s more and more, there are certain tags that repeat over and over again. I would like to have them in a shared company library. What would be the best way to approach this?
Do you have an example of the types of tags you're referring to? Are these dagster system tags or custom tags that you created?
I am referring to something like this: https://docs.dagster.io/deployment/guides/kubernetes/customizing-your-deployment#job-or-op-kubernetes-configuration In the example there is a pod_spec_config -> podaffinity set up. If i want to use the same pod affinity for all my workflows I need to repeat the same dict everywhere. I was asking myself if there was a smarter way of doing it... something like:
    tags=add_nfs_mount(read_only=True) | add_pod_affinity()
So that a developer / data scientist can build the right dictionary without having to know all the necessary annotations and tags
Got it - I could absolutely imagine a 'graph factory' pattern kind of like the op factory pattern here: https://docs.dagster.io/concepts/ops-jobs-graphs/ops#op-factory that wraps the graph decorator and applies certain tags to it, while still passing through the underlying graph function
I see the example, but in that case the op logic is inside the function, which is something that i would like to avoid. The example you posted seems perfect for identical ops that need to be created with different tags, in my case the ops/graphs and tags are variable. I guess what i would like to achieve is to "compose" the
dictionary without having to repeat the same dictionary. For example...
    tags=add_node_selector(machine_type='medium) | add_nfs_mount(mountPoint='/nfs')
def my_op:
Or maybe i misunderstood your example?
Yeah, the example isn't a perfect fit, but I bet we could have a factory that is itself a decorator (i.e. it's a decorator function and the decorated function becomes the body of the wrapped op, just with tags automatically added). Let me see if I can pull up an example
Thank you. I guess something like this could work maybe? https://stackoverflow.com/questions/34443271/how-can-i-extend-a-librarys-decorator
yeah, I was just writing up something like that example, that kept the
arg and passed it through to the
Let me know if you have something working... i haven't had any luck until now
Ah this actually turned out pretty simple, believe this works?
from dagster import op

def my_op(*args, **kwargs):
    tags = kwargs.get("tags", {})
    tags["foo"] = "bar"
    return op(*args, **kwargs)

def hello(name: str):
    print(f"Hello, {name}!")
What I've achieved at this time is an interface of this kind (maple is the name of the library):
    include_gfs_mount = True
    include_gfs_mount_mount_point = '/gfs'
def my_op():
That's the easiest solution I found. I had to fiddle around with your example but I finally got it working. Thank you @daniel 🙏 Let me know if you see any possible improvement here
ah nice - we might end up recommending this pattern elsewhere, would you mind sharing what kind of tweaks you ended up making?
forwarding decorators can be tricky
yeah... to be honest i was trying to do something like this:
def my_op():
But I didn't manage to get it working (not even sure it's possible)
I will try to share the code tomorrow
@daniel maple_op looks something like this:
def maple_op(*args, **kwargs):
    A decorator for adding extra tags to an operation.

    :param args:
    :param kwargs:
    tags = kwargs.get("tags", {})

    # Dict containing the extra tags to be added to the operation
    additional_tags = {
        "include_gfs": ["read_only", "mount_point"],
        "include_nodeselector": ["pool"],

    for additional_tag in additional_tags.keys():
        if kwargs.get(additional_tag):
            args_prefix = additional_tag + "_"
            additional_keys = additional_tags[additional_tag]
            # Create a dictionary for every sub-tag. To be passed as an argument
            method_args = {  # noqa: F841
                key: kwargs.pop(args_prefix + key)
                for key in additional_keys
                if kwargs.get(args_prefix + key)

            # Return the tags from the Merge the tags
            _dict_merge(tags, eval("_" + additional_tag + "(**method_args)"))

    kwargs["tags"] = tags
    return op(*args, **kwargs)
Then I have built separately:
def _include_gfs(read_only=False, mount_point="/gfs"):
def _include_nodeselector(pool="core"):
Let me know if you have any feedback