Harry James
05/10/2022, 1:19 PMfrom dagster import job, op, graph
@op
def return_fifty():
return 50.0
@op
def add_thirty_two(number):
return number + 32.0
@op
def multiply_by_one_point_eight(number):
return number * 1.8
@op
def log_number(context, number):
<http://context.log.info|context.log.info>(f"number: {number}")
@graph
def celsius_to_fahrenheit(number):
return add_thirty_two(multiply_by_one_point_eight(number))
@job
def all_together_nested():
log_number(celsius_to_fahrenheit(return_fifty()))
x = my_job.execute_in_process(op_selection=["*celsius_to_fahrenheit.add_thirty_two"])
x.all_node_events
yuhan
05/10/2022, 7:12 PMop_selection
in the execute_in_process
api doesn’t support the * or + selection syntax yet, which is available in Dagit. Here’s a tracking issue for this feature parity w/ dagit.
As a stopgap, you can do:
x = all_together_nested.execute_in_process(
op_selection=[
"celsius_to_fahrenheit.add_thirty_two",
"celsius_to_fahrenheit.multiply_by_one_point_eight",
],
run_config={"ops": {"celsius_to_fahrenheit": {"inputs": {"number": {"value": 1}}}}}, // fill out missing inputs via run_config
)
yuhan
05/10/2022, 7:12 PMDagster Bot
05/10/2022, 7:12 PMHarry James
05/11/2022, 8:32 AMfrom dagster import job, op
@op
def return_one():
return 1
@op
def add_two(i: int):
return i + 2
@op
def multi_three(i: int):
return i * 3
@job
def my_job():
multi_three(add_two(return_one()))
x = my_job.execute_in_process(op_selection=["*add_two"])
x.all_node_events