:partydagster: Greeting fellow Dagsterinos! :party...
# ask-community
h
🎉 Greeting fellow Dagsterinos! 🎉 Is there any reason why a Dagster job would give different results when run manually (on the dagitUI) or triggered automatically as part of a daily job (via
ScheduleDefinition
). My job runs a dbt model with the
dbt build
command and saves the outputs .csv filesin google cloud storage. Why would the scheduled job upload a csv files with a different floating point values? Specifically old numbers from before the refactor… Is there some caching mechanism that I’m missing here? See the comment section for some sample code 🙂 Thanks in advance folks!
Copy code
# samplecode.py

CURR_DATE = dt.date.today()
DIR_ULID = str(ULID())


# Load a set of DBT models from a DBT project into Dagster assets.
# Creates one Dagster asset for each dbt model.
# All assets will be re-materialized using a single dbt build command.
# This no longer uses `dbt run` and instead runs the seed command as part of `dbt build` command below.
# `dbt build --project-dir DBT_PROJECT_DIR --profiles-dir DBT_PROFILE_DIR --select tag:my_tag

with open(manifest_file, "r", encoding="utf-8") as f:
    manifest_json = json.load(f)

dbt_assets = load_assets_from_dbt_manifest(
    use_build_command=True,
    manifest_json=manifest_json,
    select="tag:not_rq",
)


def csv_assets_for_dbt_assets(dbt_assets):
    outs = {}
    deps = {}
    for asset_key in dbt_assets[0].asset_keys:
        table_name = asset_key.path[-1]
        # save to gcs if name contains "csv"
        if "csv" in table_name:
            outs[table_name] = Out(asset_key=AssetKey(["gcs", table_name]))
            deps[table_name] = {AssetKey(table_name)}
        else:
            continue

    @multi_asset(outs=outs, non_argument_deps=set(dbt_assets[0].asset_keys), compute_kind="gcs")
    def _assets(context):
        for table_name in outs.keys():
            export_table_command = ""
            # --query is so ugly because we need to use a workaround to get .csv files with headers.
            # <https://stackoverflow.com/questions/51271931/exporting-from-cloud-sql-to-csv-with-column-headers>
            if "__acc__" in table_name:
                export_table_command += f"gcloud sql export csv moonfire-01 <gs://moonfire_metrics/data/{CURR_DATE}/{DIR_ULID}/access/{table_name}.csv> --database=moonfire --query=\"SELECT 'name' AS name, 'created_time' AS created_time UNION ALL SELECT name, created_time FROM {table_name}\""  # noqa: E501
            elif "__dq__" in table_name:
                export_table_command += f"gcloud sql export csv moonfire-01 <gs://moonfire_metrics/data/{CURR_DATE}/{DIR_ULID}/decision_q/{table_name}.csv> --database=moonfire --query=\"SELECT 'count' AS count UNION ALL SELECT count FROM {table_name}\""  # noqa: E501
            elif "__cov__" in table_name:
                export_table_command += f"gcloud sql export csv moonfire-01 <gs://moonfire_metrics/data/{CURR_DATE}/{DIR_ULID}/coverage/{table_name}.csv> --database=moonfire --query=\"SELECT 'percentage_coverage' UNION ALL SELECT percentage_coverage FROM {table_name}\""  # noqa: E501

            if export_table_command:  # if export_table_command is not empty. Log and export csv to GCS.
                <http://context.log.info|context.log.info>(f"{export_table_command=}")
                subprocess.call(export_table_command, shell=True)
            yield Output(table_name, table_name)

    return [_assets]


csv_assets = csv_assets_for_dbt_assets(dbt_assets)
all_assets = cast(list, dbt_assets) + csv_assets  # more performant way to avoid type error between the two
metrics_assets_job = AssetGroup(
    assets=all_assets,
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_DIR,
                "profiles_dir": DBT_PROFILE_DIR,
            }
        )
    },
).build_job("metrics_assets_job")

metrics_assets_schedule = ScheduleDefinition(
    job=metrics_assets_job, cron_schedule="30 23 * * *", execution_timezone="Europe/London"
)
j
Newbie to dagster, but from your description I’d investigate with the assumption that Dagster has two sets of assets that are nearly identical but not. Perhaps some asset’s key is partially related to paths that differ? Maybe the dbt asset varies due to an env variable that’s present when you manually run in the user shell vs a daemon’s environment?
s
is it possible that you're somehow accessing an old version of the dbt manifest in one of the cases?
h
Hi @Jon Simpson. I started at the same place that you suggested and it turns out the problem was my manifest file. (it’s usually only used when you have a LOT of models to load and you want to cache/preload them in production for faster access to your jobs). Yes @sandy this was it! After pushing the
dbt/target/manifest.json
file everything is now working identically in the two environments 🙂 Thank you all for your time and efforts dagsir