https://dagster.io/ logo
#integration-dbt
Title
# integration-dbt
q

qsu

09/06/2023, 1:23 AM
Newbie post: 👋 I have a dbt project with models partitioned daily or hourly with different start dates. This configuration varies model by model.
partitioned_by = ['day'] or partitioned_by = ['hour']
start_date = '2023-09-01'
How do I generate Dagster assets from this dbt project utilizing the existing information? I would like our data team to be able to update partition and start_date from dbt as well. I tried load assets one by one with different
partition_def
resulting in
dagster._core.errors.DagsterInvalidDefinitionError: Invalid dependencies
Thanks in advance for your help!
👀 1
o

owen

09/08/2023, 11:47 PM
hi @qsu! the solution here would have to start with you parsing your own dbt manifest to find the set of partitions definitions that you would need here (likely analyzing the config in each node in your manifest). presuming that you get a mapping from
<dagster partitions def>
to
<dbt fqn>
from the above process, you could then write a loop to generate an
@dbt_assets
for each of these pairs, i.e.
Copy code
def get_all_dbt_assets(manifest):
    dbt_fqns_by_partitions_def = ...

    all_dbt_assets = []
    for partitions_def, dbt_fqns in dbt_fqns_by_partitions_def.items():
        @dbt_assets(partitions_def=partitions_def, select=" ".join(dbt_fqns))
        def thing(...):
            ...
        all_dbt_assets.append(thing)
    return all_dbt_assets
however, one missing bit here is that you'll end up with multiple underlying compute functions with the same name ("thing"). right now, we don't support passing in a different name through the dbt_assets, decorator, so you'd have to rely on some decorator magic such as: https://stackoverflow.com/a/10874474/16191920
also curious in the full stack trace of the "Invalid dependencies" issue
q

qsu

09/08/2023, 11:54 PM
Copy code
2023-09-05 17:09:14 -0700 - dagster.daemon.SensorDaemon - WARNING - Could not load location xxx to check for sensors due to the following error: dagster._core.errors.DagsterInvalidDefinitionError: Invalid dependencies: node "xxx" does not have output "xxx". Listed as dependency for node "xxx input "xxx"

Stack Trace:
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_grpc/server.py", line 295, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_grpc/server.py", line 139, in __init__
    loadable_targets = get_loadable_targets(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_grpc/utils.py", line 47, in get_loadable_targets
    else loadable_targets_from_python_module(module_name, working_directory)
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/workspace/autodiscovery.py", line 35, in loadable_targets_from_python_module
    module = load_python_module(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/code_pointer.py", line 135, in load_python_module
    return importlib.import_module(module_name)
  File "/Users/qsu/.pyenv/versions/3.10.0/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/Users/qsu/git/xxx/__init__.py", line 13, in <module>
    defs = Definitions(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/definitions_class.py", line 432, in __init__
    self._created_pending_or_normal_repo = _create_repository_using_definitions_args(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/definitions_class.py", line 303, in _create_repository_using_definitions_args
    def created_repo():
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 163, in __call__
    else CachingRepositoryData.from_list(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 346, in from_list
    return build_caching_repository_data_from_list(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/repository_definition/repository_data_builder.py", line 195, in build_caching_repository_data_from_list
    for job_def in get_base_asset_jobs(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/assets_job.py", line 83, in get_base_asset_jobs
    build_assets_job(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/assets_job.py", line 218, in build_assets_job
    graph = GraphDefinition(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/graph_definition.py", line 212, in __init__
    self._dependency_structure, self._node_dict = create_execution_structure(
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/node_container.py", line 179, in create_execution_structure
    _validate_dependencies(aliased_dependencies_dict, node_dict, alias_to_name)
  File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/node_container.py", line 279, in _validate_dependencies
    raise DagsterInvalidDefinitionError(

Running with dbt=1.6.1
Registered adapter: athena=1.6.0
o

owen

09/08/2023, 11:55 PM
gotcha, thanks for that -- how exactly were you loading things one by one?
q

qsu

09/08/2023, 11:57 PM
via similar methods as you listed actually. @dbt_assets(partitions_def=partitions_def, select=model_name) I have since attempted a few other things, but can find the exact version I ran early next week.
o

owen

09/08/2023, 11:58 PM
gotcha -- definitely possible that the function renaming thing might be the missing link, but happy to take a look next week
❤️ 1
q

qsu

09/11/2023, 3:40 PM
Hey thanks for taking a look. Here's the code to load assets one by one.
Copy code
def __init__(self):
    self.dbt_manifest_path = dbt_manifest_path
    self.extended_asset_def_from_manifest = self.parse_def_from_manifest()

def parse_def_from_manifest(self):
    f = open(self.dbt_manifest_path)
    json_manifest = json.load(f)

    extended_asset_def_from_manifest = {}

    extended_asset_def_from_manifest["dbt_partition"]=[]
    for node_name in json_manifest.get("nodes"):
        node = json_manifest["nodes"][node_name]
        if node.get("resource_type") == 'model':
            dbt_partition_config = {
                'model_name': node.get("name"),
                # multi partition columns are not supported
                'partitioned_by': node.get("config").get("partitioned_by")[0],
                'start_date': node.get("config").get("start_date")

            }
            extended_asset_def_from_manifest["dbt_partition"].append(dbt_partition_config)
    return extended_asset_def_from_manifest

def build_all_dbt_assets(self) -> [AssetsDefinition]:

    ret_assets = []
    excl = "tag:partitioned_model"
    @dbt_assets(
        manifest=dbt_manifest_path,
        exclude=excl,
        dagster_dbt_translator=CustomDagsterDbtTranslator()
    )
    def non_partitioned_assets(context: OpExecutionContext, dbt: DbtCliResource):
        yield from dbt.cli(["build"], context=context).stream()



    for node in self.extended_asset_def_from_manifest.get("dbt_partition"):
        model_name = node.get('model_name')
        partition_by = self.extended_asset_def_from_manifest.get("partitioned_by")
        start_date = self.extended_asset_def_from_manifest.get("start_date")
        partition_def = None
        if partition_by == 'day':
            partition_def = DailyPartitionsDefinition(start_date=start_date)
        ret_assets.append(self.build_one_dbt_asset(model_name, partition_def))

    return ret_assets

def build_one_dbt_asset(self, model_name, partition_def) -> AssetsDefinition:

    @dbt_assets(
        manifest=dbt_manifest_path,
        select=model_name,
        partitions_def=partition_def,
        dagster_dbt_translator=CustomDagsterDbtTranslator()
    )
    def choral_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
        time_window = context.asset_partitions_time_window_for_output(
            list(context.selected_output_names)[0]
        )

        dbt_vars = {
            "min_date": time_window.start.isoformat(),
            "max_date": time_window.end.isoformat()
        }
        dbt_build_args = ["build", "--vars", json.dumps(dbt_vars)]

        yield from dbt.cli(dbt_build_args, context=context).stream()
    return choral_dbt_assets
o

owen

09/11/2023, 6:03 PM
this basic shape seems reasonable to me, but I think the main issue is that all of your assets end up with the same underlying op name (
choral_dbt_assets
), so you'll probably want to do the function renaming thing mentioned in that stack overflow link to give each of your functions a unique name
q

qsu

09/11/2023, 6:04 PM
Let me try that and get back to you!
I will likely have some time to try this early next week. Appreciate all your help!
It worked for me! Thanks very much for the very helpful information! 👍👍👍
8 Views