https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Ruslan Babadzhanov

06/13/2023, 5:23 PM
Hi team, after update from Dagster 1.2.2 to 1.3.9
Copy code
reconciliation_sensor = build_asset_reconciliation_sensor(
    name="reconciliation_assets", asset_selection=AssetSelection.all()
)
starts work very strange. It runs the same asset materialization many times every 30 seconds and I have a lot of the same runs in the same time. Does anyone have such problems?
🤔 1
o

owen

06/14/2023, 4:43 PM
hi @Ruslan Babadzhanov! this is definitely surprising behavior -- do you have an example of a graph where this is happening? Ideally, I'd recommend switching over from the
build_asset_reconciliation_sensor
to using AutoMaterializePolicies / the asset daemon (as the reconciliation sensor is deprecated) but I wouldn't have expected the behavior to change in that way
🌈 1
r

Ruslan Babadzhanov

06/15/2023, 6:29 AM
Hi @owen, thanks for the advice. Yesterday I switched to AutoMaterializePolicies and it works the same. We have dbt assets with such configurations
Copy code
{{ config(
        materialized='incremental',
        properties={
            "partitioned_by": "ARRAY['date']",
        },
        dagster_freshness_policy={
            "maximum_lag_minutes": 195,
            "cron_schedule": "15 5 * * *"
        }
    ) }}
Screenshot 2023-06-15 at 10.33.26.png,Screenshot 2023-06-15 at 10.35.12.png
dbt_project.yaml - part with configuration models
Copy code
models:
  project:
    mmm:
      +materialized: view
      +view_security: invoker
      +dagster_auto_materialize_policy:
        type: eager
mmm_non_media_metrics.sql - config part
Copy code
{{ config(
    materialized='incremental',
    unique_key=['date'],
    properties={
        "partitioned_by": "ARRAY['date']",
    },
    dagster_freshness_policy={
        "maximum_lag_minutes": 195,
        "cron_schedule": "15 5 * * *"
    }
) }}

-- set default min_date
{% set min_date = '2019-01-01' %}

{% if is_incremental() %}
  -- overwrite default min_date
  {% set min_date = run_query('SELECT max(date) - INTERVAL \'7\' DAY FROM ' ~ this).columns[0].values()[0] %}
{% endif %}

{% set date_filer = 'date >= DATE(\'' ~ min_date ~ '\') - INTERVAL \'1\' DAY' %}
repo.py
Copy code
dbt_project = os.getenv("DBT_PROJECT")
dbt_profiles = os.getenv("DBT_PROFILES")


def make_slack():
    return WebClient(os.getenv('ALERT_SLACK_TOKEN'))


slack = ResourceDefinition(make_slack)
resources = {
    "dbt": dbt_cli_resource.configured(
        {
            "project_dir": dbt_project,
            "profiles_dir": dbt_profiles,
        }
    ),
    "slack": slack
}

def build_runtime_metadata(_, node_info):
    return dict(affected_table=node_info["relation_name"])


dbt_assets = load_assets_from_dbt_project(
    project_dir=dbt_project,
    profiles_dir=dbt_profiles,
    key_prefix=["project"],
    source_key_prefix=["sources"],
    runtime_metadata_fn=build_runtime_metadata,
    use_build_command=True,
)

defs = Definitions(
    assets=dbt_assets,
    resources=resources)
I created an issue
o

owen

06/16/2023, 8:03 PM
Gotcha, thank you for creating the issue, we're investigating this