Aki Iwa
05/17/2023, 6:31 AMexecution:
config:
max_concurrent: 2
....
• job.py
....
@job(executor_def=multiprocess_executor)
def elt_job():
.....
output_1, output_2 = op_a(var=output_1)
op_b(var=output_1)
output_3 = op_c(var=output_2)
...
• op.py
...
@op
def op_a(context, var: str)
....
return "output_1", "output_2"
@op(ins={"var": In()})
def op_b(context, var: str)
....
@op(ins={"var": In()})
def op_c(context, var: str)
....
yield Output(value="output_3", output_name="output_3")
...
alex
05/17/2023, 3:31 PMif max_concurrent is greater than 2, both processes will stop at the point of branchingthis sounds very strange to me, can you elaborate on what exactly you observed
Aki Iwa
05/17/2023, 9:02 PMalex
05/17/2023, 10:05 PMAki Iwa
05/17/2023, 10:10 PMalex
05/18/2023, 2:15 PMAki Iwa
05/18/2023, 8:41 PMdefs = Definitions(
assets=[*dbt_assets],
jobs=[elt_job],
schedules=[daily_schedule],
sensors=[ecs_sensor],
executor_def=multiprocess_executor,
resources={
"io_manager": s3_pickle_io_manager.configured(
{
"s3_bucket": "xxxxxxxxxxxxxxxxxxxxxxx",
"s3_prefix": "xxxxxxxxxxxxxxxxxxxxxxx",
}
),
"s3": s3_resource,
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_PATH,
"profiles_dir": DBT_PROFILES,
"target": DBT_TARGET,
},
),
},
)
• job.py
@job(
resource_defs={
"io_manager": s3_pickle_io_manager.configured(
{
"s3_bucket": "xxxxxxxxxxxxxxxxxxxxxxx",
"s3_prefix": "xxxxxxxxxxxxxxxxxxxxxxx",
}
),
"s3": s3_resource,
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_PATH,
"profiles_dir": DBT_PROFILES,
"target": DBT_TARGET,
},
),
},
executor_def=multiprocess_executor,
)
def elt_job():
exec_time = op_1()
exec_time, success = op_2(exec_time=exec_time)
op_3(exec_time=exec_time)
success = op_4(previous_op=success)
......
• op.py
@op()
def op_1(context):
try:
exec_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
some_function(exec_time)
except:
message_send("Failure", context.op_def.name, context.run_id)
sys.exit(1)
return exec_time
@op(
out={
"exec_time": Out(is_required=False, description="xxxxxxxxxxxxxxx"),
"success": Out(description="xxxxxxxxxxxxxxxxxxxxx"),
},
)
def op_2(context, exec_time):
try:
some_function(exec_time)
except:
message_send("Failure", context.op_def.name, context.run_id)
sys.exit(1)
return exec_time, "success"
@op(
ins={"exec_time": In(description="xxxxxxxxxxxxxx")},
)
def op_3(context, exec_time):
try:
some_function(exec_time)
except:
message_send("Failure", context.op_def.name, context.run_id)
sys.exit(1)
@op(
ins={
"previous_op": In(str, default_value="success"),
"dbt_arg": In(str, default_value="xxxxxxxxxxxxxxxx"),
"result_path": In(str, default_value="xxxxxxxxxxxx"),
},
out={
"success": Out(description="xxxxxxxxxxxxxxxxxxxxx"),
},
required_resource_keys={"dbt"},
)
def op_4(context, previous_op: str, dbt_arg, result_path):
try:
dbt_output = context.resources.dbt.run(
select=dbt_arg, state=result_path
)
......
alex
05/18/2023, 9:37 PMsys.exit
could prevent dagster book keeping with its immediate termination of the processpy-spy
could allow you to inspect the processes to see where they are getting stuckAki Iwa
05/23/2023, 4:31 AMalex
05/23/2023, 3:47 PM