Hi team, I try the dbt_asset decorator, here is my...
# integration-dbt
j
Hi team, I try the dbt_asset decorator, here is my code
Copy code
for i in config[key_prefix]:
    source = config[key_prefix][i]["SOURCE"]
    @dbt_assets(
        manifest=MANIFEST_PATH,
        select=f"tag:{source}",
        partitions_def=partitions_dict[f"fivetran_{source}"],
    )
    def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
        dbt_vars = date_from_context(context)
        dbt_args = ["run", "--vars", json.dumps(dbt_vars)]
        yield from dbt.cli(dbt_args, context=context).stream()

    list_of_assets.append(my_dbt_assets)

defs = Definitions(
    # jobs=jobs_dict.values(),
    assets=[*list_of_assets],
    resources=resources,
    sensors=list_of_sensors,
)
. . . and got this error (image):
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: Conflicting definitions found in repository with name 'my_dbt_assets'. Op/Graph definition names must be unique within a repository. OpDefinition is defined in job '__ASSET_JOB_0' and in job '__ASSET_JOB_1'.

  File "C:\Users\nkduy3\AppData\Local\Programs\Python\Python39\lib\site-packages\dagster\_grpc\server.py", line 278, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
  File "C:\Users\nkduy3\AppData\Local\Programs\Python\Python39\lib\site-packages\dagster\_grpc\server.py", line 141, in __init__
    repo_def.load_all_definitions()
  File "C:\Users\nkduy3\AppData\Local\Programs\Python\Python39\lib\site-packages\dagster\_core\definitions\repository_definition\repository_definition.py", line 130, in load_all_definitions
    self._repository_data.load_all_definitions()
  File "C:\Users\nkduy3\AppData\Local\Programs\Python\Python39\lib\site-packages\dagster\_core\definitions\repository_definition\repository_data.py", line 182, in load_all_definitions
    self.get_all_jobs()
  File "C:\Users\nkduy3\AppData\Local\Programs\Python\Python39\lib\site-packages\dagster\_core\definitions\repository_definition\repository_data.py", line 381, in get_all_jobs
    self._check_node_defs(self._all_jobs)
  File "C:\Users\nkduy3\AppData\Local\Programs\Python\Python39\lib\site-packages\dagster\_core\definitions\repository_definition\repository_data.py", line 471, in _check_node_defs
    raise DagsterInvalidDefinitionError(
In short, I try to create dbt assets for each source using
for loop
, and look like both of that assets got the same Op/Graph name 'my_dbt_assets'. If I'm using
load_assets_from_dbt_project
to create the asset, that function has the parameters name
op_name
and that lets me define the name of the Op. Can we do the same thing with
@dbt_assets
?
r
The name of the op defaults as the name of the function. Maybe try overwriting it in your for loop as well?
Copy code
for i in config[key_prefix]:
    source = config[key_prefix][i]["SOURCE"]

    def my_dbt_assets_compute(context: OpExecutionContext, dbt: DbtCliResource):
        dbt_vars = date_from_context(context)
        dbt_args = ["run", "--vars", json.dumps(dbt_vars)]
        yield from dbt.cli(dbt_args, context=context).stream()

    my_dbt_assets_compute.__name__ = f"my_dbt_assets_{source}"
     
    my_dbt_assets = dbt_assets(
        manifest=MANIFEST_PATH,
        select=f"tag:{source}",
        partitions_def=partitions_dict[f"fivetran_{source}"],
    )(my_dbt_assets_compute)

    list_of_assets.append(my_dbt_assets)

defs = Definitions(
    # jobs=jobs_dict.values(),
    assets=[*list_of_assets],
    resources=resources,
    sensors=list_of_sensors,
)