Aki Iwa
08/26/2023, 6:10 AMreturn exec_time, Output(run_result, output_name="success_rds_to_bq")
I was trying to receive output in the following form in the next op,
@op(
ins={"exec_time": In(description="Job execution start time")}, }
out={"run_result": Out(description="Out to save the result of ECS Task execution to s3 although not used in subsequent"), }, }
tags={"kind": "etl"}
)
The content of success_rds_to_bq
in s3 is empty and failed to receive.
dagster._core.errors.DagsterTypeCheckDidNotPass: Type check failed for step input "previous_op" - expected type "String". Description: Value "None" of python type "NoneType" must be a string.
File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 262, in dagster_event_sequence_for_ step
for step_event in check.generator(step_events):.
File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 343, in core_dagster_event_sequence_for for_step
for evt in check.generator(
File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 246, in _type_checked_event_sequence_ for for_input
raise DagsterTypeCheckDidNotPass(
Normally, the contents are not empty, and this has only happened once so far.
Is it safe to assume that this is a rare event that occurs because s3 does not guarantee reliable communication?
Module versions, etc. are the same as aboveZach
08/27/2023, 8:47 PMAki Iwa
08/27/2023, 11:49 PM@job(
resource_defs={
"io_manager": s3_pickle_io_manager.configured(
{
"s3_bucket": "xxxxxxxxxxxxxxxxxx",
"s3_prefix": "xxxxxxxxxxxxxx",
}
),
"s3": s3_resource,
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_PATH,
"profiles_dir": DBT_PROFILES,
"target": DBT_TARGET,
},
),
},
executor_def=multiprocess_executor.configured({"max_concurrent": 3}),
tags={MAX_RUNTIME_SECONDS_TAG: 10800},
)
def elt_job():
"""
D-one環境のジョブパイプライン。主に以下の処理を行う。
"""
exec_time, _ = clone_rds()
exec_time, success_rds_to_bq = rds_to_bq(exec_time=exec_time)
remove_rds(exec_time=exec_time)
success_dbt_run = dbt_run(previous_op=success_rds_to_bq)
update_bq_column_description(previous_op=success_dbt_run)
success_dbt_test = dbt_test(previous_op=success_dbt_run)
dbt_docs(previous_op=success_dbt_test)
• op.py
@op(
ins={"exec_time": In(description="ジョブ実行開始時間")},
out={
"exec_time": Out(is_required=False, description="ジョブ実行開始時間"),
"success_rds_to_bq": Out(description="後続に渡す便宜的な値。s3に永続的に保存する目的でECS Taskの結果を含む"),
},
tags={"kind": "etl"},
)
def rds_to_bq(context, exec_time):
"""
D-oneのクローンされたRDSからBigQueryに対してデータを転送する。kick.shの実行。
"""
try:
run_result = run_ecs("/root/danballone_etl/ops/bq_elt.yml", exec_time, 1)
except BaseException:
raise Failure(
description=f"{context.op_def.name} is failure.",
)
return exec_time, Output(run_result, output_name="success_rds_to_bq")
@op(
ins={"exec_time": In(description="ジョブ実行開始時間")},
out={"run_result": Out(description="後続で利用はしないがECS Task実行結果をs3に保存するためにOutする"), },
tags={"kind": "etl"}
)
def remove_rds(context, exec_time):
"""
クローンしたRDSを削除する。remove_rds.shの実行。
"""
try:
run_result = run_ecs("/root/danballone_etl/ops/bq_elt.yml", exec_time, 2)
except BaseException:
raise Failure(
description=f"{context.op_def.name} is failure.",
)
return Output(run_result, output_name="run_result")
@op(
ins={
"previous_op": In(str, default_value="success_rds_to_bq",
description="default_valueはdummy値"),
"dbt_arg": In(str, default_value="warehouse warehouse_work"),
"result_path": In(str, default_value="target"),
},
out={
"success_dbt_run": Out(description="処理成功を後続opに伝える便宜的パラメータ"),
},
required_resource_keys={"dbt"},
tags={"kind": "dbt"},
)
def dbt_run(context, previous_op: str, dbt_arg, result_path):
"""
xxxxxxxxxxxxxxxリポジトリのdbtプロジェクトよりdbt処理を実行する。
"""
......
• packages
psutil==5.6.3
PyYAML==6.0
pydantic==1.10.6
gspread==4.0.0
boto3==1.17.14
dagster==1.3.7
dagster-dbt==0.19.7
dagster-cloud==1.3.7
dagster-aws
Aki Iwa
08/30/2023, 12:18 AM