Ian Venter
03/25/2023, 9:44 AMstg_customer_job = define_asset_job(name="stg_customer_job", selection=AssetSelection.keys(
'customer_hist').downstream())
#poll every 30 sec
@sensor(job=stg_customer_job, minimum_interval_seconds=30,default_status=DefaultSensorStatus.STOPPED)
def stg_customer_sensor(context: SensorEvaluationContext):
connHelper = DBConnectionHelper()
conn, cursor = connHelper.get_connection()
# Query the Postgres database to check for new data
cursor.execute(
"select count(*) from stage.stg_customer where _dwh_load_dtm > (select coalesce(max(_dwh_processed_change_dtm),TIMESTAMP '1900-01-01 12:00:00') as max_dtm from stage.customer_hist)")
count = cursor.fetchone()[0]
connHelper.close_connection(conn)
if count > 0:
yield RunRequest(run_key=context.cursor, asset_selection=[AssetKey(['customer_hist']), AssetKey(['stage', 'customer_current']), AssetKey(['datamart', 'dim_customer']), AssetKey(['datamart', 'customer_order'])])
else:
yield SkipReason("No changes detected in stage.stg_customer")
rex
03/27/2023, 2:59 PMasset_selection
parameter in the RunRequest
. When the run request is yielded, the sensor should kickoff the stg_customer_job
, which already has the asset selection you desire.
https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#defining-a-sensorIan Venter
03/27/2023, 9:05 PMrex
03/28/2023, 1:00 AMload_assets_from_dbt_manifest
or load_assets_from_dbt_project
functions?
When you click into the stg_customer_job
in the Dagster UI, does it show all the assets that you want to materialize, including your downstream assets?Ian Venter
03/28/2023, 4:38 AMdbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES,use_build_command=True,
)