<@U0667DNC02Y> choose whether to run op_1 or op_2 ...
# ask-ai
b
@Scout choose whether to run op_1 or op_2 on my asset how to do it
s
In Dagster, the decision to run one operation (op) or another based on a condition is not directly related to assets, as assets are more about representing the outputs of computations and their dependencies. However, you can control the execution flow within a job that computes assets. Here's how you might approach conditional execution within a job: 1. Define your assets: First, you would define your assets normally. Remember that assets are about the outputs and their lineage, not the control flow. 2. Use a job to orchestrate: You would then create a job that includes the logic to decide whether to run the computation corresponding to
op_1
or
op_2
. This decision-making logic would typically be encapsulated within an op that precedes the asset computations. 3. Conditional logic within an op: Inside the op that controls the flow, you would implement the logic to decide whether to run
op_1
or
op_2
. This could be based on config values, the result of a computation, or some external condition. 4. Connect the ops to assets: If
op_1
or
op_2
produces an asset, you would connect the output of these ops to the asset definitions using the
@asset
decorator. Here's a simplified example based on the concepts described in the documentation:
Copy code
python from dagster import job, op, asset  @op(config_schema={"run_op_1": bool}) def conditional_op(context):     if context.op_config["run_op_1"]:         return run_op_1()     else:         return run_op_2()  @op def run_op_1():     # Logic for op_1     pass  @op def run_op_2():     # Logic for op_2     pass  @asset def my_asset(result_from_conditional_op):     # Logic to create an asset, possibly using the result from conditional_op     pass  @job def my_job():     result_from_conditional_op = conditional_op()     my_asset(result_from_conditional_op)
In this example,
conditional_op
decides which op to run based on its config. The result of this op is then used to compute the
my_asset
asset. When you execute the
my_job
, you can provide the config to control which op is executed. Please note that this is a conceptual example to illustrate the approach. The actual implementation may vary based on your specific requirements and the complexity of your assets and operations. If you have a more complex scenario or need further assistance, please provide additional details, and I can help guide you through the process.
b
@Scout Is it possible to call ops within ops ?
s
Yes, it is possible to call one op from within another op in Dagster. This is a common pattern when you want to encapsulate reusable logic within an op and then invoke that logic from other ops. Here's a simple example to illustrate this:
Copy code
python from dagster import op  @op def sub_op():     # Logic for the sub op     return "result from sub_op"  @op def main_op():     # Logic before calling sub_op     result = sub_op()     # Logic after calling sub_op     return f"main_op received: {result}"
In this example,
main_op
calls
sub_op
, and you can use the result of
sub_op
within
main_op
. This allows you to compose ops and build more complex workflows by reusing ops. However, it's important to note that when you call an op from within another op like this, it's just a regular Python function call; Dagster's execution engine isn't managing the execution in the same way it does when ops are composed within a job or graph. This means that certain Dagster features, like retries, output materialization, and context management, won't be handled automatically for the inner op (
sub_op
in this case). For more complex workflows where you want Dagster to manage the execution of all ops, you should compose them within a job or graph, which allows Dagster to handle the execution plan, dependencies, and other runtime considerations. Here's an example of how you might compose ops within a graph:
Copy code
python from dagster import graph, op  @op def sub_op():     # Logic for the sub op     return "result from sub_op"  @op def main_op(sub_result):     # Logic that uses the result from sub_op     return f"main_op received: {sub_result}"  @graph def my_graph():     main_op(sub_op())  # To execute the graph, you would create a job from it my_job = my_graph.to_job()
In this graph,
sub_op
and
main_op
are connected, and Dagster manages their execution when you run
my_job
. This approach leverages Dagster's execution engine and provides you with the full benefits of the framework, including execution monitoring, retries, and dependency resolution.
b
@Scout: I have op_main that returns {"mydata":data} and based on the size the pickle returned by op_main I'd want to either run op_large or op_small. How'd I achieve that
s
To achieve conditional branching based on the size of the data returned by
op_main
, you can create an additional op that decides which subsequent op to run (
op_large
or
op_small
). This decision-making op would inspect the size of the pickled data and yield the appropriate op. Here's a conceptual example of how you might set this up:
Copy code
python import pickle from dagster import op, job, Out, DynamicOut, DynamicOutput  @op def op_main():     # Your logic here     data = {"mydata": "some large data"}     return data  @op(out=DynamicOut()) def decide_op(data):     # Serialize the data to a pickle and check its size     pickled_data = pickle.dumps(data)     size = len(pickled_data)          # Decide which op to run based on the size of the pickled data     if size > some_threshold:         yield DynamicOutput("large", "large")     else:         yield DynamicOutput("small", "small")  @op def op_large():     # Logic for handling large data     pass  @op def op_small():     # Logic for handling small data     pass  @job def my_job():     data = op_main()     decision = decide_op(data)          # Map the decision to the appropriate op     decision.map(lambda x: op_large() if x == "large" else op_small())
In this example,
op_main
returns the data, and
decide_op
determines the size of the pickled data. Based on the size, it yields a
DynamicOutput
with a mapping key that indicates whether the data is "large" or "small". The
decision.map
call then dynamically maps the output to either
op_large
or
op_small
. Please note that this is a simplified example to illustrate the concept. In practice, you may need to adjust the logic to fit your specific use case, such as how you handle the data and the threshold for deciding between "large" and "small". Additionally, the actual serialization and size checking should be done in a way that is appropriate for your data and use case.
b
@Scout in your example how is the data in op_main is passed to either op_large or op_small ?
s
Apologies for the oversight. In the previous example, the data from
op_main
was not being passed to
op_large
or
op_small
. To pass the data, you would need to modify the
decide_op
to yield both the decision and the data, and then pass the data to
op_large
or
op_small
based on the decision. Here's the corrected example:
Copy code
python import pickle from dagster import op, job, Out, DynamicOut, DynamicOutput  @op def op_main():     # Your logic here     data = {"mydata": "some large data"}     return data  @op(out=DynamicOut()) def decide_op(data):     # Serialize the data to a pickle and check its size     pickled_data = pickle.dumps(data)     size = len(pickled_data)          # Decide which op to run based on the size of the pickled data     if size > some_threshold:         yield DynamicOutput((data, "large"), "large")     else:         yield DynamicOutput((data, "small"), "small")  @op def op_large(data):     # Logic for handling large data     pass  @op def op_small(data):     # Logic for handling small data     pass  @job def my_job():     data = op_main()     decisions = decide_op(data)          # Map the decision to the appropriate op and pass the data     decisions.map(lambda x: op_large(x[0]) if x[1] == "large" else op_small(x[0]))
In this updated example,
decide_op
yields a tuple containing the data and the decision string ("large" or "small"). The
decisions.map
call then uses a lambda function to check the decision string and pass the data to the correct op (
op_large
or
op_small
). This way, the data from
op_main
is correctly passed to the subsequent ops based on the size of the pickled data.