Hi, I'm using Dagster with DBT. I've got a DAG wi...
# integration-dbt
i
Hi, I'm using Dagster with DBT. I've got a DAG with a few pipelines. See attached image for a visual. The red arrow in the visual is the pipeline, my question relates too. The pipeline is triggered by a sensor. The sensor code is outlined below. As you can see in the job definition 'stg_customer_job', i'm flagging that I want all downstream assets materialized as part of this job. However i've noticed the only affect the job definition has is to display the correct lineage in DAGIT, however this does not define the order for the materialization that i'm after. The only way i can get it to materialize all the downstream assets from "stg_customer" is to specify all the downstream assets in the RunRequest. Is this how it is meant to work or am i missing something?
Copy code
stg_customer_job = define_asset_job(name="stg_customer_job", selection=AssetSelection.keys(
    'customer_hist').downstream())

#poll every 30 sec
@sensor(job=stg_customer_job, minimum_interval_seconds=30,default_status=DefaultSensorStatus.STOPPED)
def stg_customer_sensor(context: SensorEvaluationContext):
    connHelper = DBConnectionHelper()
    conn, cursor = connHelper.get_connection()

    # Query the Postgres database to check for new data
    cursor.execute(
        "select count(*) from stage.stg_customer where _dwh_load_dtm > (select coalesce(max(_dwh_processed_change_dtm),TIMESTAMP '1900-01-01 12:00:00') as max_dtm from stage.customer_hist)")

    count = cursor.fetchone()[0]

    connHelper.close_connection(conn)

    if count > 0:
        yield RunRequest(run_key=context.cursor, asset_selection=[AssetKey(['customer_hist']), AssetKey(['stage', 'customer_current']), AssetKey(['datamart', 'dim_customer']), AssetKey(['datamart', 'customer_order'])])
    else:
        yield SkipReason("No changes detected in stage.stg_customer")
r
Hey Ian, i wouldn’t expect that you need to supply the
asset_selection
parameter in the
RunRequest
. When the run request is yielded, the sensor should kickoff the
stg_customer_job
, which already has the asset selection you desire. https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#defining-a-sensor
i
Hi @rex, the stg_customer_job runs however it only materialized customer_hist table. Even though i've specified the downstream property. The only way I can get the entire pipeline to run is by specifying all the dependent models in as you can see from my run request. I was not expecting to have to do this, cause DBT is aware of the downstream models in the pipeline. Which leads me to believe that maybe i'm misunderstanding something.
r
How are you defining your dbt assets? Are you using the
load_assets_from_dbt_manifest
or
load_assets_from_dbt_project
functions? When you click into the
stg_customer_job
in the Dagster UI, does it show all the assets that you want to materialize, including your downstream assets?
i
When i load the job in question in Dagit it shows all the dependent assets, however it only materializes customer_hist. The only way I can get it to materialize all the assets in the pipeline is by including them in the RunRequest as per the code snippet provided previously. I'm loading the assets as follows:
Copy code
dbt_assets = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES,use_build_command=True,
)
Not sure I understand how it's meant to work, but I can confirm that when i remove the 'asset_selection' from the RunRequest in the sensors, it does not have the desired result. I tried it and removed the 'asset_selection' from the 3 sensors I have and it seems to constantly try and materialize all the tables in the DAG. See Fig1 which i've attached. It reminds me of the behavior before i started using sensors and had 1 schedule which i used to materialize all the models in the DAG. I can confirm the only way I can get it to run the way I need it to is by including the relevant models in the RunRequest's asset_selection parameter. When I do this I get the desired result of each sensor kicking off a job that only runs the models in the lineage for that job.See Fig2 & Fig3. Only down side is that I have to tell Dagster which models to run for each pipeline which seems counter intuitive.