Hi! I have a little feature request for the dagste...
# ask-community
y
Hi! I have a little feature request for the dagster_dbt.asset_defs.load_assets_from_dbt_project function. Actually, a couple of them. 1. It would be great to allow pass manifest_json and cli_output as optional arguments. We have multiple repos for different environments (dev, qa, staging, prod, etc.) all of them use the same dbt project (using different targets). With the current implementation, dbt ls is being called for each dagster repo which is redundant. To save some resources we run dbt ls in jobs dockerfile and save artifacts to JSON files:
...
RUN mkdir -p /dbt_artifacts \
&& dbt --log-format json ls --project-dir /path/my_dbt_project --profiles-dir /path/my_dbt_project/config --select tag:daily --resource-type model --output json > /dbt_artifacts/my_dbt_project_daily_cli_output.json \
&& cp my_dbt_project/target/manifest.json /dbt_artifacts/my_dbt_project_daily_manifest.json
...
Then during dagster launch time, we read manifest_json and cli_output data from these files and pass them to a slightly modified version of load_assets_from_dbt_project. But it would be much better if load_assets_from_dbt_project supported this feature out of the box. 2. Allow passing a custom function to build dbt asset's output metadata. The final implementation might look something like this:
def _dbt_nodes_to_assets(
dbt_nodes: Mapping[str, Any],
select: str,
selected_unique_ids: AbstractSet[str],
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, RawMetadataValue]]
] = None,
io_manager_key: Optional[str] = None,
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = _get_node_asset_key,
use_build_command: bool = False,
outs_metadata_fn: Callable[[Dict[str, Any]], Optional[Mapping[str, Any]] = _node_info_to_metadata
) -> AssetsDefinition:
...
outs[node_name] = Out(
asset_key=node_info_to_asset_key(node_info),
description=description,
io_manager_key=io_manager_key,
metadata=_node_info_to_metadata(node_info),
)
...
def load_assets_from_dbt_project(
project_dir: str,
profiles_dir: Optional[str] = None,
target_dir: Optional[str] = None,
select: Optional[str] = None,
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]
] = None,
io_manager_key: Optional[str] = None,
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = _get_node_asset_key,
use_build_command: bool = False,
outs_metadata_fn: Callable[[Dict[str, Any]], Optional[Mapping[str, Any]] = _node_info_to_metadata,
manifest_json: Optional[Dict[str, Any]] = None,
cli_output: Optional[DbtCliOutput] = None,
) -> Sequence[AssetsDefinition]:
...
if not manifest_json or not cli_output:
check.str_param(project_dir, "project_dir")
profiles_dir = check.opt_str_param(
profiles_dir, "profiles_dir", os.path.join(project_dir, "config")
)
target_dir = check.opt_str_param(target_dir, "target_dir", os.path.join(project_dir, "target"))
manifest_json, cli_output = _load_manifest_for_project(
project_dir, profiles_dir, target_dir, select or "*"
)
selected_unique_ids: Set[str] = set(
filter(None, (line.get("unique_id") for line in cli_output.logs))
)
dbt_nodes = {**manifest_json["nodes"], **manifest_json["sources"]}
return [
_dbt_nodes_to_assets(
dbt_nodes,
select=select or "*",
selected_unique_ids=selected_unique_ids,
runtime_metadata_fn=runtime_metadata_fn,
io_manager_key=io_manager_key,
node_info_to_asset_key=node_info_to_asset_key,
use_build_command=use_build_command,
outs_metadata_fn=outs_metadata_fn,
),
]
dagster bot resolve to issue 1
s
cc @owen
o
hi @Yevhen Samoilenko, thanks for the suggestions! For the first request, my understanding is that you're not able to use the
load_assets_from_dbt_manifest
function because of the selection parameter (which this function does not currently support). I believe that you could actually use the
selected_unique_ids
parameter to
load_assets_from_dbt_manifest
to do this selection, but we're also going to be adding an experimental feature in this week's release to allow you to use normal dbt select syntax with that function (it's a bit experimental, but I'm pretty confident it will work for your scenario).
for the second point, this definitely makes sense! in general, we've received a few requests for more flexibility in how the dbt assets get built, so this is something we're looking into. In the meantime, you could use the runtime_metadata_fn to at least get some of the extra info you're interested in associated w/ each asset (but of course this is not exactly what you want)
s
@Dagster Bot issue Custom function to build dbt asset output metadata
d