Hi, Excuse me for continuing. Using s3 for Manage...
# ask-community
a
Hi, Excuse me for continuing. Using s3 for Management I/O, with the following return value in the op process,
Copy code
return exec_time, Output(run_result, output_name="success_rds_to_bq")
I was trying to receive output in the following form in the next op,
Copy code
@op(
    ins={"exec_time": In(description="Job execution start time")}, }
    out={"run_result": Out(description="Out to save the result of ECS Task execution to s3 although not used in subsequent"), }, }
    tags={"kind": "etl"}
)
The content of
success_rds_to_bq
in s3 is empty and failed to receive.
Copy code
dagster._core.errors.DagsterTypeCheckDidNotPass: Type check failed for step input "previous_op" - expected type "String". Description: Value "None" of python type "NoneType" must be a string.
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 262, in dagster_event_sequence_for_ step
    for step_event in check.generator(step_events):.
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 343, in core_dagster_event_sequence_for for_step
    for evt in check.generator(
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 246, in _type_checked_event_sequence_ for for_input
    raise DagsterTypeCheckDidNotPass(
Normally, the contents are not empty, and this has only happened once so far. Is it safe to assume that this is a rare event that occurs because s3 does not guarantee reliable communication? Module versions, etc. are the same as above
z
S3 is an extremely reliable service, hence why a significant portion of the world's software uses it for storage. Events in which reading from S3 returns no content for an object that actually has content, or fails to write content without an exception being raised, should be exceedingly rare to the point where one could probably consider it impossible. It guarantees read-after-write consistency, so I find it much more likely that there's a bug in your code or a library you're using, but it's tough to say without seeing the code for the op that generated the output.
a
Thank you. The code excerpt is as follows, specifying a failure in the transition of the process from op rds_to_bq() to dbt_run(). • job.py
Copy code
@job(
    resource_defs={
        "io_manager": s3_pickle_io_manager.configured(
            {
                "s3_bucket": "xxxxxxxxxxxxxxxxxx",
                "s3_prefix": "xxxxxxxxxxxxxx",
            }
        ),
        "s3": s3_resource,
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_PATH,
                "profiles_dir": DBT_PROFILES,
                "target": DBT_TARGET,
            },
        ),
    },
    executor_def=multiprocess_executor.configured({"max_concurrent": 3}),
    tags={MAX_RUNTIME_SECONDS_TAG: 10800},
)
def elt_job():
    """
    D-one環境のジョブパイプライン。主に以下の処理を行う。
    """
    exec_time, _ = clone_rds()
    exec_time, success_rds_to_bq = rds_to_bq(exec_time=exec_time)
    remove_rds(exec_time=exec_time)
    success_dbt_run = dbt_run(previous_op=success_rds_to_bq)
    update_bq_column_description(previous_op=success_dbt_run)
    success_dbt_test = dbt_test(previous_op=success_dbt_run)
    dbt_docs(previous_op=success_dbt_test)
• op.py
Copy code
@op(
    ins={"exec_time": In(description="ジョブ実行開始時間")},
    out={
        "exec_time": Out(is_required=False, description="ジョブ実行開始時間"),
        "success_rds_to_bq": Out(description="後続に渡す便宜的な値。s3に永続的に保存する目的でECS Taskの結果を含む"),
    },
    tags={"kind": "etl"},
)
def rds_to_bq(context, exec_time):
    """
    D-oneのクローンされたRDSからBigQueryに対してデータを転送する。kick.shの実行。
    """
    try:
        run_result = run_ecs("/root/danballone_etl/ops/bq_elt.yml", exec_time, 1)
    except BaseException:
        raise Failure(
            description=f"{context.op_def.name} is failure.",
        )
    return exec_time, Output(run_result, output_name="success_rds_to_bq")


@op(
    ins={"exec_time": In(description="ジョブ実行開始時間")},
    out={"run_result": Out(description="後続で利用はしないがECS Task実行結果をs3に保存するためにOutする"), },
    tags={"kind": "etl"}
)
def remove_rds(context, exec_time):
    """
    クローンしたRDSを削除する。remove_rds.shの実行。
    """
    try:
        run_result = run_ecs("/root/danballone_etl/ops/bq_elt.yml", exec_time, 2)
    except BaseException:
        raise Failure(
            description=f"{context.op_def.name} is failure.",
        )
    return Output(run_result, output_name="run_result")

@op(
    ins={
        "previous_op": In(str, default_value="success_rds_to_bq",
                          description="default_valueはdummy値"),
        "dbt_arg": In(str, default_value="warehouse warehouse_work"),
        "result_path": In(str, default_value="target"),
    },
    out={
        "success_dbt_run": Out(description="処理成功を後続opに伝える便宜的パラメータ"),
    },
    required_resource_keys={"dbt"},
    tags={"kind": "dbt"},
)
def dbt_run(context, previous_op: str, dbt_arg, result_path):
    """
    xxxxxxxxxxxxxxxリポジトリのdbtプロジェクトよりdbt処理を実行する。
    """
    ......
• packages
Copy code
psutil==5.6.3
PyYAML==6.0
pydantic==1.10.6
gspread==4.0.0
boto3==1.17.14
dagster==1.3.7
dagster-dbt==0.19.7
dagster-cloud==1.3.7
dagster-aws
It has become possible that we made a correction error here, so we are taking it down for now.thankyou bow