Brian Milby
06/20/2023, 8:08 PMop
that returns a dataclass object
@op
def op_dis_file_entry_agent(context) -> FileAgentDetails:
print("op_dis_file_entry_agent")
return FileAgentDetails()
how do I access the return value in my graph
?
@graph
def graph_load_article_data():
result = op_dis_file_entry_agent()
yuhan
06/20/2023, 8:42 PMresult
, you may need to create a downstream op.Brian Milby
06/20/2023, 8:52 PMop_dis_file_entry_agent
and then call another graph
which contains another op
process ..so i do have a downstream op as you mentioned
@dataclass
class FileAgentDetails:
file_agent_success: bool = False
dremio_drpzn_table_path: str = None
input_file_dir: str = None
output_file_dir: str = None
file_archive_dir: str = None
input_file_path: str = None
file_hash_id: str = None
@graph
def graph_load_article_data():
print("graph_load_article_data")
result = op_dis_file_entry_agent()
file_agent_success = get return value property from result.file_agent_success
dremio_drpzn_table_path = get return value property from result.dremio_drpzn_table_path
print(f"op_dis_file_entry_agent success: {file_agent_success}")
if file_agent_success:
print(f"dremio_drpzn_table_path: {dremio_drpzn_table_path}")
graph_load_edlh_data(dremio_drpzn_table_path)
Brian Milby
06/20/2023, 10:37 PMop
not return the values and have access the same way as a normal function?yuhan
06/20/2023, 10:38 PMfile_agent_success = get return value property from result.file_agent_success
dremio_drpzn_table_path = get return value property from result.dremio_drpzn_table_path
because graph is a container of op dependencies, we don’t really execute the function of the body, so if you’d like to return multiple outputs, you would need to return them as separate outputs like: https://docs.dagster.io/concepts/ops-jobs-graphs/nesting-graphs#multiple-outputs
----
alternatively, sounds like what you’re also looking for is conditional branching: https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching, then something like this should work:
@dataclass
class FileAgentDetails:
file_agent_success: bool = False
dremio_drpzn_table_path: str = None
input_file_dir: str = None
output_file_dir: str = None
file_archive_dir: str = None
input_file_path: str = None
file_hash_id: str = None
@op
def my_upstream():
return 1
@op(out={"result": Out(is_required=False, dagster_type=FileAgentDetails)})
def op_dis_file_entry_agent(my_upstream):
if ...:
yield Output(FileAgentDetails(...), output_name="result")
# else do nothing
@graph(out={"result": GraphOut()})
def graph_load_article_data():
print("graph_load_article_data")
return op_dis_file_entry_agent(my_upstream())
@op
def my_op_in_graph_load_edlh_data(file_agent_details):
print(file_agent_details)
@graph
def graph_load_edlh_data(file_agent_details):
my_op_in_graph_load_edlh_data(file_agent_details)
@job
def my_job():
graph_load_edlh_data(graph_load_article_data())
Brian Milby
06/20/2023, 10:46 PM