Harpal
08/30/2022, 4:23 PMScheduleDefinition
).
My job runs a dbt model with the dbt build
command and saves the outputs .csv filesin google cloud storage. Why would the scheduled job upload a csv files with a different floating point values? Specifically old numbers from before the refactor…
Is there some caching mechanism that I’m missing here?
See the comment section for some sample code 🙂
Thanks in advance folks!# samplecode.py
CURR_DATE = dt.date.today()
DIR_ULID = str(ULID())
# Load a set of DBT models from a DBT project into Dagster assets.
# Creates one Dagster asset for each dbt model.
# All assets will be re-materialized using a single dbt build command.
# This no longer uses `dbt run` and instead runs the seed command as part of `dbt build` command below.
# `dbt build --project-dir DBT_PROJECT_DIR --profiles-dir DBT_PROFILE_DIR --select tag:my_tag
with open(manifest_file, "r", encoding="utf-8") as f:
manifest_json = json.load(f)
dbt_assets = load_assets_from_dbt_manifest(
use_build_command=True,
manifest_json=manifest_json,
select="tag:not_rq",
)
def csv_assets_for_dbt_assets(dbt_assets):
outs = {}
deps = {}
for asset_key in dbt_assets[0].asset_keys:
table_name = asset_key.path[-1]
# save to gcs if name contains "csv"
if "csv" in table_name:
outs[table_name] = Out(asset_key=AssetKey(["gcs", table_name]))
deps[table_name] = {AssetKey(table_name)}
else:
continue
@multi_asset(outs=outs, non_argument_deps=set(dbt_assets[0].asset_keys), compute_kind="gcs")
def _assets(context):
for table_name in outs.keys():
export_table_command = ""
# --query is so ugly because we need to use a workaround to get .csv files with headers.
# <https://stackoverflow.com/questions/51271931/exporting-from-cloud-sql-to-csv-with-column-headers>
if "__acc__" in table_name:
export_table_command += f"gcloud sql export csv moonfire-01 <gs://moonfire_metrics/data/{CURR_DATE}/{DIR_ULID}/access/{table_name}.csv> --database=moonfire --query=\"SELECT 'name' AS name, 'created_time' AS created_time UNION ALL SELECT name, created_time FROM {table_name}\"" # noqa: E501
elif "__dq__" in table_name:
export_table_command += f"gcloud sql export csv moonfire-01 <gs://moonfire_metrics/data/{CURR_DATE}/{DIR_ULID}/decision_q/{table_name}.csv> --database=moonfire --query=\"SELECT 'count' AS count UNION ALL SELECT count FROM {table_name}\"" # noqa: E501
elif "__cov__" in table_name:
export_table_command += f"gcloud sql export csv moonfire-01 <gs://moonfire_metrics/data/{CURR_DATE}/{DIR_ULID}/coverage/{table_name}.csv> --database=moonfire --query=\"SELECT 'percentage_coverage' UNION ALL SELECT percentage_coverage FROM {table_name}\"" # noqa: E501
if export_table_command: # if export_table_command is not empty. Log and export csv to GCS.
<http://context.log.info|context.log.info>(f"{export_table_command=}")
subprocess.call(export_table_command, shell=True)
yield Output(table_name, table_name)
return [_assets]
csv_assets = csv_assets_for_dbt_assets(dbt_assets)
all_assets = cast(list, dbt_assets) + csv_assets # more performant way to avoid type error between the two
metrics_assets_job = AssetGroup(
assets=all_assets,
resource_defs={
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROFILE_DIR,
}
)
},
).build_job("metrics_assets_job")
metrics_assets_schedule = ScheduleDefinition(
job=metrics_assets_job, cron_schedule="30 23 * * *", execution_timezone="Europe/London"
)
Jon Simpson
08/30/2022, 4:48 PMsandy
08/31/2022, 1:19 AMHarpal
08/31/2022, 8:17 AMdbt/target/manifest.json
file everything is now working identically in the two environments 🙂
Thank you all for your time and efforts dagsir