Atheer Abdullatif
05/23/2023, 1:22 PMfrom dagster import schedule, AssetSelection, define_asset_job, RunRequest, ScheduleEvaluationContext, repository, Definitions, load_assets_from_modules
from dagster_dbt import DbtCliClientResource
from mvp_analytics_pipeline import assets
from mvp_analytics_pipeline.assets import DBT_PROFILES, DBT_PROJECT_PATH
from mvp_analytics_pipeline.jobs.etl import local_job_transform
resources = {
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_PATH,
profiles_dir=DBT_PROFILES,
),
}
@schedule(job=local_job_transform, cron_schedule="0 0 * * *")
def configurable_job_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
run_key=None,
run_config={
"ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}}
},
tags={"date": scheduled_date},
)
defs = Definitions(
jobs=[local_job_transform],
schedules=[configurable_job_schedule],
assets=load_assets_from_modules([assets]), resources=resources
)
jobs/etl.py
from dagster_dbt import dbt_cli_resource, dbt_run_op
@job(resource_defs={"dbt":dbt_cli_resource})
def local_job_transform():
dbt_run_op()
assests/__init__.py
from dagster_dbt import load_assets_from_dbt_project
from dagster import file_relative_path
DBT_PROJECT_PATH = file_relative_path(__file__, "../../mvp_analytics_dbt")
DBT_PROFILES = file_relative_path(__file__, "../../mvp_analytics_dbt/config")
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["mvp_analytics_dbt"]
)
owen
05/23/2023, 5:57 PMload_assets_from_dbt_project
and dbt_run_op
is generally not necessary, as load_assets_from_dbt_project
already allows you to execute those assets. So you could replace your local_job_transform
job with:
from dagster import define_asset_job, AssetSelection
local_job_transform = define_asset_job(
"local_job_transform",
selection=AssetSelection.all(),
)
this will create a job that will materialize all of your dbt assetsAtheer Abdullatif
05/25/2023, 7:34 AMowen
05/25/2023, 4:24 PM