Yevhen Samoilenko
08/16/2022, 10:51 AMsandy
08/16/2022, 6:49 PMowen
08/16/2022, 11:28 PMYevhen Samoilenko
08/17/2022, 12:40 PMAre your integration assets created using a multi-asset (i.e. a single step that creates a bunch of assets), or is each integration asset its own distinct step?Actually, we use both of these approaches.
from dagster import asset, Output, AssetObservation, OpExecutionContext
@asset(output_required=False)
def may_fail(context: OpExecutionContext):
try:
result = [1, 2, 3, 4]
yield Output(result)
except Exception as e:
yield AssetObservation(
asset_key=context.asset_key_for_output(),
metadata={"error": str(e)},
)
@asset
def downstream(may_fail):
return may_fail + [5]
In this case, the job won't fail. And theoretically, downstream assets should execute for successful materializations. The only thing left is to somehow check for warnings and fail the job at the end (or send an alert to slack or something like that).
@sandy, @owen what do you think?owen
08/17/2022, 4:18 PMoutput_required
is set to False, the steps downstream of the optional output will not be executed, not just the assets. Because all dbt assets run in a single step, they will all be skipped. This is pretty confusing behavior, and should be fixed (but I don't think it's a super quick fix on our end to do that). Just created a ticket for that here: https://github.com/dagster-io/dagster/issues/9408Yevhen Samoilenko
08/18/2022, 7:54 AMfrom dagster import (
define_asset_job,
AssetSelection,
run_status_sensor,
DagsterRunStatus,
RunRequest,
DefaultSensorStatus,
)
integrations_job = define_asset_job(
name="integrations_job",
selection=AssetSelection.groups("integrations"),
)
dbt_job = define_asset_job(
name="dbt_job",
selection=AssetSelection.groups("dbt"),
)
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=dbt_job,
monitored_jobs=[integrations_job],
default_status=DefaultSensorStatus.RUNNING,
)
def integrations_job_success_sensor():
return RunRequest(run_key=None)
@run_status_sensor(
run_status=DagsterRunStatus.FAILURE,
request_job=dbt_job,
monitored_jobs=[integrations_job],
default_status=DefaultSensorStatus.RUNNING,
)
def integrations_job_failure_sensor():
return RunRequest(run_key=None)
What if we split our asset-based jobs into two parts - integrations and dbt? And use run_status_sensor for triggering dbt run for both failure and success statuses? I know, this is kind of a hack, but still. And is it possible to run dbt assets only for successful integrations (using run_config or something else)?owen
08/18/2022, 4:30 PMYevhen Samoilenko
08/18/2022, 4:37 PM