Andrea Giardini
01/24/2022, 9:21 AMdaniel
01/24/2022, 3:23 PMAndrea Giardini
01/24/2022, 3:28 PM@graph(
tags=add_nfs_mount(read_only=True) | add_pod_affinity()
)
So that a developer / data scientist can build the right dictionary without having to know all the necessary annotations and tagsdaniel
01/24/2022, 3:29 PMAndrea Giardini
01/24/2022, 3:57 PMdagster-k8s/config
dictionary without having to repeat the same dictionary.
For example...
@op(
tags=add_node_selector(machine_type='medium) | add_nfs_mount(mountPoint='/nfs')
)
def my_op:
pass
Or maybe i misunderstood your example?Andrea Giardini
01/24/2022, 3:58 PMdaniel
01/24/2022, 3:59 PMAndrea Giardini
01/24/2022, 4:53 PMdaniel
01/24/2022, 4:54 PMfunc
arg and passed it through to the op
Andrea Giardini
01/24/2022, 4:56 PMdaniel
01/24/2022, 6:02 PMfrom dagster import op
def my_op(*args, **kwargs):
tags = kwargs.get("tags", {})
tags["foo"] = "bar"
return op(*args, **kwargs)
@my_op()
def hello(name: str):
print(f"Hello, {name}!")
Andrea Giardini
01/26/2022, 4:23 PM@maple_op(
include_gfs_mount = True
include_gfs_mount_mount_point = '/gfs'
)
def my_op():
pass
That's the easiest solution I found. I had to fiddle around with your example but I finally got it working. Thank you @daniel 🙏
Let me know if you see any possible improvement heredaniel
01/26/2022, 4:23 PMdaniel
01/26/2022, 4:23 PMAndrea Giardini
01/26/2022, 4:25 PM@op(
name='mytempop'
)
@include_gfs(mount_point='/gfs')
def my_op():
pass
But I didn't manage to get it working (not even sure it's possible)Andrea Giardini
01/26/2022, 4:25 PMAndrea Giardini
01/31/2022, 12:37 PMdef maple_op(*args, **kwargs):
"""
A decorator for adding extra tags to an operation.
:param args:
:param kwargs:
include_gfs
include_gfs_read_only
include_gfs_mount_point
include_nodeselector
include_nodeselector_pool
:return:
dagster.op
"""
tags = kwargs.get("tags", {})
# Dict containing the extra tags to be added to the operation
additional_tags = {
"include_gfs": ["read_only", "mount_point"],
"include_nodeselector": ["pool"],
}
for additional_tag in additional_tags.keys():
if kwargs.get(additional_tag):
args_prefix = additional_tag + "_"
additional_keys = additional_tags[additional_tag]
# Create a dictionary for every sub-tag. To be passed as an argument
method_args = { # noqa: F841
key: kwargs.pop(args_prefix + key)
for key in additional_keys
if kwargs.get(args_prefix + key)
}
# Return the tags from the Merge the tags
_dict_merge(tags, eval("_" + additional_tag + "(**method_args)"))
kwargs.pop(additional_tag)
kwargs["tags"] = tags
return op(*args, **kwargs)
Then I have built separately:
def _include_gfs(read_only=False, mount_point="/gfs"):
and
def _include_nodeselector(pool="core"):
Let me know if you have any feedback