Quentin Gaborit
08/14/2023, 1:25 PMdagster-dbt
interface. Before the change, I used to have factories for generating meltano
or dbt
software-defined-asset, and to move to the new API I was thinking of creating a wrapper like:
def dbt_asset_factory(
select: str,
io_manager_key: str = "bigquery_io_manager",
) -> list[AssetsDefinition]:
@dbt_assets(
manifest=Path(DBT_MANIFEST_PATH),
io_manager_key=io_manager_key,
select=select,
)
def compute(
context: AssetExecutionContext,
dbt_cli_resource: DbtCliResource,
):
yield from dbt_cli_resource.cli(["run"], context=context).stream()
dbt_test_invocation = dbt_cli_resource.cli(["test"], context=context)
if not dbt_test_invocation.is_successful():
raise Exception("my exception")
return [compute]
Unfortunately hat snippet won't work because the compute
function should have unique identifier to ensure the Dagster dependencies inference will work correctly. Have you ever encountered such use case? If yes do you have any idea of how to get there?rex
08/14/2023, 1:29 PM__name__
rex
08/14/2023, 1:31 PMdef dbt_asset_factory(
select: str,
io_manager_key: str = "bigquery_io_manager",
) -> list[AssetsDefinition]:
def compute(
context: AssetExecutionContext,
dbt_cli_resource: DbtCliResource,
):
yield from dbt_cli_resource.cli(["run"], context=context).stream()
dbt_test_invocation = dbt_cli_resource.cli(["test"], context=context)
if not dbt_test_invocation.is_successful():
raise Exception("my exception")
compute.__name__ = "set_unique_name_here"
my_dbt_assets = dbt_assets(
manifest=Path(DBT_MANIFEST_PATH),
io_manager_key=io_manager_key,
select=select,
)(compute)
return [my_dbt_assets]
Quentin Gaborit
08/14/2023, 1:46 PMdbt_assets()
it couldn't workQuentin Gaborit
08/14/2023, 1:46 PMrex
08/14/2023, 1:50 PM@dbt_assets
decorator API docstring for future referenceQuentin Gaborit
08/14/2023, 1:51 PM/resources