Constantin Lungu
03/21/2023, 11:13 PMdagster_dbt
. I'm encountering the following error:
agster._core.errors.DagsterInvalidDefinitionError: resource with key 'dbt' required by op 'run_dbt_da47d' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'dbt', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
I see that
dbt_cli_resource.configured(
{"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}
)
Is already a ResourceDefinition.
import os
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from dagster_duckdb_pandas import duckdb_pandas_io_manager
from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
fs_io_manager,
load_assets_from_package_module,
)
from dagster._utils import file_relative_path
DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../dbt_project/config")
# all assets live in the default dbt_schema
dbt_assets = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
# prefix the output assets based on the database they live in plus the name of the schema
key_prefix=["duckdb", "dbt_schema"],
# prefix the source assets based on just the database
# (dagster populates the source schema information automatically)
source_key_prefix=["duckdb"],
)
a = dbt_cli_resource.configured(
{"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR})
print(type(a))
resources = {
# this io_manager allows us to load dbt models as pandas dataframes
"io_manager": duckdb_pandas_io_manager.configured(
{"database": os.path.join(DBT_PROJECT_DIR, "example.duckdb")}
),
# this resource is used to execute dbt cli commands
"dbt": dbt_cli_resource.configured(
{"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}
)
}
everything_job = define_asset_job("everything_everywhere_job", selection="*")
defs = Definitions(
assets=[*dbt_assets],
resources=resources,
schedules=[
ScheduleDefinition(job=everything_job, cron_schedule="@weekly") ]
)
Can someone please point out what am I missing?jamie
03/22/2023, 2:23 PMrex
03/22/2023, 2:42 PMtutorial_dbt_dagster
project with your sample code and I’m not getting your error.
dagster project from-example --name tutorial_dbt_dagster --example tutorial_dbt_dagster
import os
from dagster import (
Definitions,
load_assets_from_modules,
define_asset_job,
ScheduleDefinition,
)
from dagster_dbt import dbt_cli_resource
from dagster_duckdb_pandas import duckdb_pandas_io_manager
from tutorial_dbt_dagster import assets
from tutorial_dbt_dagster.assets import DBT_PROFILES, DBT_PROJECT_PATH
resources = {
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_PATH,
"profiles_dir": DBT_PROFILES,
},
),
"io_manager": duckdb_pandas_io_manager.configured(
{"database": os.path.join(DBT_PROJECT_PATH, "tutorial.duckdb")}
),
}
everything_job = define_asset_job("everything_everywhere_job", selection="*")
defs = Definitions(
assets=load_assets_from_modules([assets]),
resources=resources,
schedules=[ScheduleDefinition(job=everything_job, cron_schedule="@weekly")],
)
jamie
03/22/2023, 2:44 PMConstantin Lungu
03/22/2023, 2:47 PMdagster-1.2.2
jamie
03/22/2023, 2:48 PMConstantin Lungu
03/22/2023, 2:49 PM