Yevhen Samoilenko
05/23/2022, 7:33 AM...
RUN mkdir -p /dbt_artifacts \
&& dbt --log-format json ls --project-dir /path/my_dbt_project --profiles-dir /path/my_dbt_project/config --select tag:daily --resource-type model --output json > /dbt_artifacts/my_dbt_project_daily_cli_output.json \
&& cp my_dbt_project/target/manifest.json /dbt_artifacts/my_dbt_project_daily_manifest.json
...
Then during dagster launch time, we read manifest_json and cli_output data from these files and pass them to a slightly modified version of load_assets_from_dbt_project. But it would be much better if load_assets_from_dbt_project supported this feature out of the box.
2. Allow passing a custom function to build dbt asset's output metadata.
The final implementation might look something like this:
def _dbt_nodes_to_assets(
dbt_nodes: Mapping[str, Any],
select: str,
selected_unique_ids: AbstractSet[str],
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, RawMetadataValue]]
] = None,
io_manager_key: Optional[str] = None,
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = _get_node_asset_key,
use_build_command: bool = False,
outs_metadata_fn: Callable[[Dict[str, Any]], Optional[Mapping[str, Any]] = _node_info_to_metadata
) -> AssetsDefinition:
...
outs[node_name] = Out(
asset_key=node_info_to_asset_key(node_info),
description=description,
io_manager_key=io_manager_key,
metadata=_node_info_to_metadata(node_info),
)
...
def load_assets_from_dbt_project(
project_dir: str,
profiles_dir: Optional[str] = None,
target_dir: Optional[str] = None,
select: Optional[str] = None,
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]
] = None,
io_manager_key: Optional[str] = None,
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = _get_node_asset_key,
use_build_command: bool = False,
outs_metadata_fn: Callable[[Dict[str, Any]], Optional[Mapping[str, Any]] = _node_info_to_metadata,
manifest_json: Optional[Dict[str, Any]] = None,
cli_output: Optional[DbtCliOutput] = None,
) -> Sequence[AssetsDefinition]:
...
if not manifest_json or not cli_output:
check.str_param(project_dir, "project_dir")
profiles_dir = check.opt_str_param(
profiles_dir, "profiles_dir", os.path.join(project_dir, "config")
)
target_dir = check.opt_str_param(target_dir, "target_dir", os.path.join(project_dir, "target"))
manifest_json, cli_output = _load_manifest_for_project(
project_dir, profiles_dir, target_dir, select or "*"
)
selected_unique_ids: Set[str] = set(
filter(None, (line.get("unique_id") for line in cli_output.logs))
)
dbt_nodes = {**manifest_json["nodes"], **manifest_json["sources"]}
return [
_dbt_nodes_to_assets(
dbt_nodes,
select=select or "*",
selected_unique_ids=selected_unique_ids,
runtime_metadata_fn=runtime_metadata_fn,
io_manager_key=io_manager_key,
node_info_to_asset_key=node_info_to_asset_key,
use_build_command=use_build_command,
outs_metadata_fn=outs_metadata_fn,
),
]
sean
05/25/2022, 1:20 PMowen
05/25/2022, 4:20 PMload_assets_from_dbt_manifest
function because of the selection parameter (which this function does not currently support). I believe that you could actually use the selected_unique_ids
parameter to load_assets_from_dbt_manifest
to do this selection, but we're also going to be adding an experimental feature in this week's release to allow you to use normal dbt select syntax with that function (it's a bit experimental, but I'm pretty confident it will work for your scenario).sean
05/31/2022, 12:28 PMDagster Bot
05/31/2022, 12:28 PM