Brandon Peebles
03/05/2024, 9:53 PMop_one
that is a dependency of op_two
such that I use the output of op_one
in op_two
. I want to skip running op_one
in branch deployments since it is not neededScout
03/05/2024, 9:53 PMpython import os def get_current_env(): is_branch_depl = os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1" assert is_branch_depl is not None # env var must be set return "branch" if is_branch_depl else "prod"
In this function, the DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT
environment variable is checked, and if it is equal to "1"
, it indicates that the current deployment is a branch deployment. Using this information, you can write a job that includes logic to conditionally execute op_one
based on the deployment type. However, since the documentation does not provide a direct example of how to conditionally skip an op within a job, I will provide general guidance based on the principles of Dagster. You would typically define your job with both op_one
and op_two
, where op_two
depends on op_one
. To conditionally skip op_one
, you could use a pattern where op_one
always runs but performs no action or returns a default value in branch deployments. Then, op_two
can use the output of op_one
as needed. Here's a conceptual example: python from dagster import job, op @op def op_one(context): if get_current_env() == "branch": <http://context.log.info|context.log.info>("Skipping op_one in branch deployment.") # Return a default value or perform no action return None else: # Perform the actual computation or action return compute_something() @op def op_two(context, input_from_op_one): # Use the output from op_one as needed if input_from_op_one is not None: # Perform actions with the input pass else: <http://context.log.info|context.log.info>("Received no input from op_one, proceeding with alternative logic.") @job def conditional_job(): op_two(op_one())
In this example, op_one
checks the deployment type and decides whether to perform its computation. op_two
takes the output of op_one
and handles the case where op_one
did not perform its usual computation. Please note that this is a conceptual example to illustrate the approach. You would need to adapt this pattern to your specific use case and ensure that op_two
can handle the case where op_one
does not provide its usual output.Brandon Peebles
03/05/2024, 9:55 PMjob
definition. I'd rather not execute an op and use a credit if it's not needed.Scout
03/05/2024, 9:55 PMop_one
entirely based on a condition within the job
definition, you can use dynamic orchestration with Dagster's DynamicOut
and DynamicOutput
to conditionally yield outputs that determine the execution graph at runtime. This allows you to create a job where the execution of op_two
does not depend on op_one
when certain conditions are met, such as a branch deployment. Here's a conceptual example using dynamic orchestration to skip `op_one`: python from dagster import job, op, DynamicOut, DynamicOutput, Out @op(out=DynamicOut()) def branch_check_op(): if get_current_env() == "branch": # Skip op_one in branch deployment yield DynamicOutput(value=None, mapping_key="skip_op_one") else: # Proceed with op_one in other deployments yield DynamicOutput(value=None, mapping_key="run_op_one") @op def op_one(): # Perform the actual computation or action return compute_something() @op def op_two(input_from_op_one=None): # Use the output from op_one as needed # Perform actions with the input pass @job def conditional_job(): branch_decision = branch_check_op() # Only run op_one if the branch_check_op yields the "run_op_one" output op_one_result = branch_decision.map(op_one).collect() # Pass the result of op_one to op_two, which can handle None input op_two(op_one_result)
In this example, branch_check_op
is an op that dynamically determines whether to proceed with op_one
based on the deployment environment. It yields a DynamicOutput
with a mapping key that indicates whether to skip or run op_one
. The map
method is then used to conditionally apply op_one
to the output of branch_check_op
. The collect
method is used to gather the dynamic outputs into a single output that can be passed to op_two
. Please note that this is a conceptual example to illustrate the approach. You would need to adapt this pattern to your specific use case and ensure that op_two
can handle the case where it receives no input from op_one
. Additionally, you should ensure that the dynamic orchestration features used in this example align with the current capabilities and best practices of the Dagster version you are using.Brandon Peebles
03/05/2024, 9:57 PMoutput
to the downstream op instead of one that was actually generated by an op?Scout
03/05/2024, 9:58 PMOut
and Output
classes to explicitly yield outputs from ops, which can then be passed to downstream ops. This allows you to bypass the execution of an upstream op and provide a predetermined value directly to a downstream op. Here's an example of how you might structure your job to conditionally pass a custom output to `op_two`: python from dagster import job, op, Out, Output @op(out={"condition": Out(), "result": Out(is_required=False)}) def check_condition_and_produce_result(): if get_current_env() == "branch": # In a branch deployment, we skip op_one and provide a custom output yield Output(True, "condition") # Since op_one is skipped, we do not yield a result for it else: # In other deployments, we run op_one and provide its result yield Output(False, "condition") yield Output(compute_something(), "result") @op def op_two(input_from_op_one=None): # Use the output from op_one or the custom input as needed # Perform actions with the input pass @job def conditional_job(): condition_result, op_one_result = check_condition_and_produce_result() op_two(input_from_op_one=op_one_result)
In this example, check_condition_and_produce_result
is an op that checks the deployment environment and yields two outputs: a condition flag and an optional result. If the condition indicates a branch deployment, it yields True
for the condition and does not yield a result for op_one
. If it's not a branch deployment, it yields False
for the condition and the actual result of op_one
. The op_two
is then defined to accept an input that can either be the output of op_one
or a custom value (in this case, None
if op_one
is skipped). The conditional_job
job wires up these ops, and op_two
will receive the appropriate input based on the condition. This approach allows you to control the flow of your job and the execution of ops based on runtime conditions without executing unnecessary ops.