Akira Renbokoji
03/01/2024, 5:20 PMmy_list = {
"big_list": {
"list_a": content_a,
"list_b": content_b,
},
"small_list": {
"list_c": content_c,
"list_d": content_d,
}
}
@graph
def foo(my_list):
for list_content in my_list.values():
for content_name, content in list_content.items():
stuff = job.do_stuff(content_name, content)
How do I pass a dictionary to a graph?
Should I make it an @op function that returns the dictionary then pass that in as an argument to the graph?
Update:
I forgot that users shouldn't be trying to pass dictionaries to graphs or having @op call another @op
Instead have the @op return data and have @graph pass that data to another @op:
from some_file_with_op import job
@graph()
def foo():
some_dict = job.build_dict()
list_of_dicts = job.write_data_return_list(some_dict)
job.run_app(list_of_dicts)
Ismael Rodrigues
03/01/2024, 5:31 PMAkira Renbokoji
03/01/2024, 5:42 PMZach
03/01/2024, 6:45 PMAkira Renbokoji
03/01/2024, 6:46 PMAkira Renbokoji
03/01/2024, 6:46 PMZach
03/01/2024, 6:47 PM@graph
def foo(my_list):
for list_content in my_list.values():
for content_name, content in list_content.items():
stuff = job.do_stuff(content_name, content)
Akira Renbokoji
03/01/2024, 6:47 PMAkira Renbokoji
03/01/2024, 6:47 PMZach
03/01/2024, 6:48 PM@graph
and @job
are executed at the time your code is loaded. They should only contain ops that are being executed and passing the ouputs from ops to each otherAkira Renbokoji
03/01/2024, 6:48 PMZach
03/01/2024, 6:48 PMZach
03/01/2024, 6:48 PMAkira Renbokoji
03/01/2024, 6:48 PMAkira Renbokoji
03/01/2024, 6:48 PMZach
03/01/2024, 6:49 PMfrom dagster import Config
class ListConfig(Config):
list_a: List[str]
list_b: List[str]
...
@op
def some_op_that_needs_list_configs(context: OpExecutionContext, config: ListConfig):
context.log(f"This is list_a: {config.list_a}")
Akira Renbokoji
03/01/2024, 6:50 PMmy_graph.to_job(
config={
"ops": {
"load_dict": {
"config": {
"my_dict": my_list
}
}
}
}
)
Akira Renbokoji
03/01/2024, 6:51 PMZach
03/01/2024, 6:51 PMAkira Renbokoji
03/01/2024, 6:51 PMAkira Renbokoji
03/01/2024, 6:51 PMZach
03/01/2024, 6:51 PMIsmael Rodrigues
03/01/2024, 6:52 PMAkira Renbokoji
03/01/2024, 6:52 PMIsmael Rodrigues
03/01/2024, 6:52 PMZach
03/01/2024, 6:52 PMAkira Renbokoji
03/01/2024, 6:52 PMAkira Renbokoji
03/01/2024, 6:52 PMIsmael Rodrigues
03/01/2024, 6:53 PMAkira Renbokoji
03/01/2024, 6:53 PM@op(config_schema={"my_dict": Field(dict)})
def load_dict(context):
stuff = context.op_config["my_dict"]
<http://context.log.info|context.log.info>(my_dict)
return my_dict
Zach
03/01/2024, 6:53 PMAkira Renbokoji
03/01/2024, 6:55 PMAkira Renbokoji
03/01/2024, 6:55 PMAkira Renbokoji
03/01/2024, 6:56 PMmy_job.execute_in_process()
to set up run_configZach
03/01/2024, 6:58 PMZach
03/01/2024, 6:59 PMAkira Renbokoji
03/01/2024, 7:00 PMsome_job = my_graph.to_job()
some_job.execute_in_process(
run_config=RunConfig(
...
)
)
Ismael Rodrigues
03/01/2024, 7:00 PMIsmael Rodrigues
03/01/2024, 7:06 PMclass MyConfigClass(Config):
your_dict: dict
@op(out=DynamicOut())
def my_op_dynamic_out(context, config: MyConfigClass):
for v, k in config.your_dict:
yield DynamicOutput(v, mapping_key=k)
@op
def my_op_do_stuff(context, params: dict):
process stuff
@job
def my_job():
my_op_dynamic_out(my_op_do_stuff)
OR
@job
def my_job():
def _for_each(param):
my_op_do_stuff(param)
my_op_dynamic_out().map(_for_each)
And you can always use the lambda operator on map() to pass whenever param you wantIsmael Rodrigues
03/01/2024, 7:07 PMIsmael Rodrigues
03/01/2024, 7:08 PMAkira Renbokoji
03/01/2024, 7:40 PMAkira Renbokoji
03/01/2024, 10:26 PM@graph()
def foo():
some_dict = job.build_dict()
list_of_dicts = job.write_data_return_list(some_dict)
job.run_app(list_of_dicts)
Akira Renbokoji
03/01/2024, 10:26 PMZach
03/01/2024, 10:41 PMjob
object?Akira Renbokoji
03/01/2024, 10:44 PM@op
in it.Zach
03/01/2024, 10:45 PMjob.*
operations you're calling are ops.