qsu
09/06/2023, 1:23 AMpartitioned_by = ['day'] or partitioned_by = ['hour']
start_date = '2023-09-01'
How do I generate Dagster assets from this dbt project utilizing the existing information? I would like our data team to be able to update partition and start_date from dbt as well.
I tried load assets one by one with different partition_def
resulting in dagster._core.errors.DagsterInvalidDefinitionError: Invalid dependencies
Thanks in advance for your help!owen
09/08/2023, 11:47 PM<dagster partitions def>
to <dbt fqn>
from the above process, you could then write a loop to generate an @dbt_assets
for each of these pairs, i.e.
def get_all_dbt_assets(manifest):
dbt_fqns_by_partitions_def = ...
all_dbt_assets = []
for partitions_def, dbt_fqns in dbt_fqns_by_partitions_def.items():
@dbt_assets(partitions_def=partitions_def, select=" ".join(dbt_fqns))
def thing(...):
...
all_dbt_assets.append(thing)
return all_dbt_assets
however, one missing bit here is that you'll end up with multiple underlying compute functions with the same name ("thing"). right now, we don't support passing in a different name through the dbt_assets, decorator, so you'd have to rely on some decorator magic such as: https://stackoverflow.com/a/10874474/16191920owen
09/08/2023, 11:48 PMqsu
09/08/2023, 11:54 PM2023-09-05 17:09:14 -0700 - dagster.daemon.SensorDaemon - WARNING - Could not load location xxx to check for sensors due to the following error: dagster._core.errors.DagsterInvalidDefinitionError: Invalid dependencies: node "xxx" does not have output "xxx". Listed as dependency for node "xxx input "xxx"
Stack Trace:
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_grpc/server.py", line 295, in __init__
self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_grpc/server.py", line 139, in __init__
loadable_targets = get_loadable_targets(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_grpc/utils.py", line 47, in get_loadable_targets
else loadable_targets_from_python_module(module_name, working_directory)
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/workspace/autodiscovery.py", line 35, in loadable_targets_from_python_module
module = load_python_module(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/code_pointer.py", line 135, in load_python_module
return importlib.import_module(module_name)
File "/Users/qsu/.pyenv/versions/3.10.0/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/Users/qsu/git/xxx/__init__.py", line 13, in <module>
defs = Definitions(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/definitions_class.py", line 432, in __init__
self._created_pending_or_normal_repo = _create_repository_using_definitions_args(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/definitions_class.py", line 303, in _create_repository_using_definitions_args
def created_repo():
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 163, in __call__
else CachingRepositoryData.from_list(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 346, in from_list
return build_caching_repository_data_from_list(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/repository_definition/repository_data_builder.py", line 195, in build_caching_repository_data_from_list
for job_def in get_base_asset_jobs(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/assets_job.py", line 83, in get_base_asset_jobs
build_assets_job(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/assets_job.py", line 218, in build_assets_job
graph = GraphDefinition(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/graph_definition.py", line 212, in __init__
self._dependency_structure, self._node_dict = create_execution_structure(
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/node_container.py", line 179, in create_execution_structure
_validate_dependencies(aliased_dependencies_dict, node_dict, alias_to_name)
File "/Users/qsu/git/xxx/venv3/lib/python3.10/site-packages/dagster/_core/definitions/node_container.py", line 279, in _validate_dependencies
raise DagsterInvalidDefinitionError(
Running with dbt=1.6.1
Registered adapter: athena=1.6.0
owen
09/08/2023, 11:55 PMqsu
09/08/2023, 11:57 PMowen
09/08/2023, 11:58 PMqsu
09/11/2023, 3:40 PMdef __init__(self):
self.dbt_manifest_path = dbt_manifest_path
self.extended_asset_def_from_manifest = self.parse_def_from_manifest()
def parse_def_from_manifest(self):
f = open(self.dbt_manifest_path)
json_manifest = json.load(f)
extended_asset_def_from_manifest = {}
extended_asset_def_from_manifest["dbt_partition"]=[]
for node_name in json_manifest.get("nodes"):
node = json_manifest["nodes"][node_name]
if node.get("resource_type") == 'model':
dbt_partition_config = {
'model_name': node.get("name"),
# multi partition columns are not supported
'partitioned_by': node.get("config").get("partitioned_by")[0],
'start_date': node.get("config").get("start_date")
}
extended_asset_def_from_manifest["dbt_partition"].append(dbt_partition_config)
return extended_asset_def_from_manifest
def build_all_dbt_assets(self) -> [AssetsDefinition]:
ret_assets = []
excl = "tag:partitioned_model"
@dbt_assets(
manifest=dbt_manifest_path,
exclude=excl,
dagster_dbt_translator=CustomDagsterDbtTranslator()
)
def non_partitioned_assets(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
for node in self.extended_asset_def_from_manifest.get("dbt_partition"):
model_name = node.get('model_name')
partition_by = self.extended_asset_def_from_manifest.get("partitioned_by")
start_date = self.extended_asset_def_from_manifest.get("start_date")
partition_def = None
if partition_by == 'day':
partition_def = DailyPartitionsDefinition(start_date=start_date)
ret_assets.append(self.build_one_dbt_asset(model_name, partition_def))
return ret_assets
def build_one_dbt_asset(self, model_name, partition_def) -> AssetsDefinition:
@dbt_assets(
manifest=dbt_manifest_path,
select=model_name,
partitions_def=partition_def,
dagster_dbt_translator=CustomDagsterDbtTranslator()
)
def choral_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
time_window = context.asset_partitions_time_window_for_output(
list(context.selected_output_names)[0]
)
dbt_vars = {
"min_date": time_window.start.isoformat(),
"max_date": time_window.end.isoformat()
}
dbt_build_args = ["build", "--vars", json.dumps(dbt_vars)]
yield from dbt.cli(dbt_build_args, context=context).stream()
return choral_dbt_assets
owen
09/11/2023, 6:03 PMchoral_dbt_assets
), so you'll probably want to do the function renaming thing mentioned in that stack overflow link to give each of your functions a unique nameqsu
09/11/2023, 6:04 PMqsu
09/14/2023, 1:05 AMqsu
09/26/2023, 12:06 AM