Hi. I want to execute processes that branch in pip...
# dagster-plus
a
Hi. I want to execute processes that branch in pipelines concurrently in parallel. I have the following setup, but if max_concurrent is greater than 2, both processes will stop at the point of branching. Is this an incorrect way to make the branching process? • LaunchPad
Copy code
execution:
  config:
    max_concurrent: 2
....
• job.py
Copy code
....
@job(executor_def=multiprocess_executor)
def elt_job():
    .....
    output_1, output_2 = op_a(var=output_1)
    op_b(var=output_1)
    output_3 = op_c(var=output_2)
    ...
• op.py
Copy code
...

@op
def op_a(context, var: str)
    ....
    return "output_1", "output_2"

@op(ins={"var": In()})
def op_b(context, var: str)
    ....


@op(ins={"var": In()})
def op_c(context, var: str)
    ....
    yield Output(value="output_3", output_name="output_3")

...
a
if max_concurrent is greater than 2, both processes will stop at the point of branching
this sounds very strange to me, can you elaborate on what exactly you observed
a
Thank you. I was checking the pipeline execution from Runs. op seemed to be correctly outputting the outputs that correspond to “output_1” and “output_2". However, the subsequent op stopped with the “Started execution of step” status while receiving the outputs.
a
so from the event log i see dagster starting the two subprocesses for the two steps, but then nothing after. Did you cancel the run because things stalled at this point? One guess is that there is something happening during process init / import time in your code that is causing things to stall. Another guess would be that the system you are running on is having issues trying to start two processes simultaneously
a
Yes, I cancelled the run because it had been stuck for 10 minutes. The executions were done at the same time, but that’s where the problem was. What would be the recommended implementation? Should the timing be staggered a bit, even if they are executed in parallel?
a
from a dagster perspective there shouldn’t be anything wrong with this implementation, i am not sure whats causing the stall.
a
Oh, there may be a problem with some of the details, so I’ll describe it in more detail, although it’s not as obvious as it seems. With in_process_executor, it executes sequentially with no problem. The same thing happened when I staggered the time, changed the priority, and split the pipeline into two separate pipelines instead of branching. • definitions.py
Copy code
defs = Definitions(
    assets=[*dbt_assets],
    jobs=[elt_job],
    schedules=[daily_schedule],
    sensors=[ecs_sensor],
    executor_def=multiprocess_executor,
    resources={
        "io_manager": s3_pickle_io_manager.configured(
            {
                "s3_bucket": "xxxxxxxxxxxxxxxxxxxxxxx",
                "s3_prefix": "xxxxxxxxxxxxxxxxxxxxxxx",
            }
        ),
        "s3": s3_resource,
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_PATH,
                "profiles_dir": DBT_PROFILES,
                "target": DBT_TARGET,
            },
        ),
    },
)
• job.py
Copy code
@job(
    resource_defs={
        "io_manager": s3_pickle_io_manager.configured(
            {
                "s3_bucket": "xxxxxxxxxxxxxxxxxxxxxxx",
                "s3_prefix": "xxxxxxxxxxxxxxxxxxxxxxx",
            }
        ),
        "s3": s3_resource,
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_PATH,
                "profiles_dir": DBT_PROFILES,
                "target": DBT_TARGET,
            },
        ),
    },
    executor_def=multiprocess_executor,
)
def elt_job():
    exec_time = op_1()
    exec_time, success = op_2(exec_time=exec_time)
    op_3(exec_time=exec_time)
    success = op_4(previous_op=success)
    ......
• op.py
Copy code
@op()
def op_1(context):
    try:
        exec_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        some_function(exec_time)
    except:
        message_send("Failure", context.op_def.name, context.run_id)
        sys.exit(1)
    return exec_time


@op(
    out={
        "exec_time": Out(is_required=False, description="xxxxxxxxxxxxxxx"),
        "success": Out(description="xxxxxxxxxxxxxxxxxxxxx"),
    },
)
def op_2(context, exec_time):
    try:
        some_function(exec_time)
    except:
        message_send("Failure", context.op_def.name, context.run_id)
        sys.exit(1)
    return exec_time, "success"


@op(
    ins={"exec_time": In(description="xxxxxxxxxxxxxx")},
)
def op_3(context, exec_time):
    try:
        some_function(exec_time)
    except:
        message_send("Failure", context.op_def.name, context.run_id)
        sys.exit(1)


@op(
    ins={
        "previous_op": In(str, default_value="success"),
        "dbt_arg": In(str, default_value="xxxxxxxxxxxxxxxx"),
        "result_path": In(str, default_value="xxxxxxxxxxxx"),
    },
    out={
        "success": Out(description="xxxxxxxxxxxxxxxxxxxxx"),
    },
    required_resource_keys={"dbt"},
)
def op_4(context, previous_op: str, dbt_arg, result_path):
    try:
        dbt_output = context.resources.dbt.run(
            select=dbt_arg, state=result_path
        )
        ......
a
it could be related to using
sys.exit
could prevent dagster book keeping with its immediate termination of the process
using a tool like
py-spy
could allow you to inspect the processes to see where they are getting stuck
❤️ 1
thankyou bow 1
a
We confirmed that parallel execution was possible in the local environment, although there was a difference between dagster versions 1.2.3 and 1.3.5.
In our cloud environment, the problem was solved when we increased the dagster version to 1.3.5. I should have mentioned the version. Thank you !!
a
nice! glad you got things working