geoHeil
05/23/2023, 4:57 PMruntime_metadata_fn
function to retrieve the number of created rows if DBT was materializing a table? The run_results.json in results.adapter_response.rows_affected
seems to contain the desired value. How is this different to: *node_info_to_definition_metadata_fn*
?owen
05/23/2023, 5:29 PMruntime_metadata_fn
takes two arguments, one being the context and the other being the node info. so, one option here would be to have your runtime_metadata_fn use the dbt resource available on the context to fetch the run_results.json object. This would be something like:
def my_runtime_metada_fn(context, node_info):
run_results = context.resources.dbt.get_run_results_json()
unique_id = node_info["unique_id"]
# ... parse out rows_affected from adapter response
return {"rows_affected": rows_affected}
geoHeil
05/23/2023, 5:39 PMowen
05/23/2023, 5:39 PMgeoHeil
05/23/2023, 5:46 PM{
"metadata": {
"dbt_schema_version": "<https://schemas.getdbt.com/dbt/run-results/v4.json>",
"dbt_version": "1.4.6",
"generated_at": "2023-05-23T15:54:40.621481Z",
"invocation_id": "e9e2e65a-1307-476c-b68c-ddf8f7c55128",
"env": {}
},
"results": [
{
"status": "success",
"timing": [
{
"name": "compile",
"started_at": "2023-05-23T15:54:20.016828Z",
"completed_at": "2023-05-23T15:54:20.021289Z"
},
{
"name": "execute",
"started_at": "2023-05-23T15:54:20.021870Z",
"completed_at": "2023-05-23T15:54:40.500335Z"
}
],
"thread_id": "Thread-1 (worker)",
"execution_time": 20.486584186553955,
"adapter_response": {
"_message": "SELECT 3196779",
"code": "SELECT",
"rows_affected": 3196779
},
"message": "SELECT 3196779",
"failures": null,
"unique_id": "<<<my_unique_id>>"
}
],
"elapsed_time": 21.433478832244873,
"args": {
"write_json": true,
"use_colors": true,
"printer_width": 80,
"version_check": true,
"partial_parse": true,
"static_parser": true,
"profiles_dir": "xxxx",
"send_anonymous_usage_stats": true,
"quiet": false,
"no_print": false,
"cache_selected_only": false,
"target": "<<<<",
"select": [
"kkkk"
],
"which": "run",
"rpc_method": "run",
"indirect_selection": "eager"
}
}
Would dagster automatically parse the right asset key to each individual element of the results
?
If yes, how is dagsters asset key filtering to the right element in the results object?
I would like to get the adapter_response.rows_affected
for each result (or at leats for each result where the materialization is of type table).
I have to admit, it is still unclear to me how the right filter/assigment between asset key and unique id would look like.
Assuming asset key and uniqe ID are identical - would I need to (manually) search the array?
However, most likely they are not identical (as the asset key is only constructed ) by feeding additional parameters (key_prefix, source_key_prefix) to the load assets form DBT function.
So I would somehow need to unpack/unparse these as well?owen
05/23/2023, 5:49 PMnode_info
argument to runtime_metadata_fn
is the specific node info associated with a given asset key (so technically the asset key is not even an input to this function). To expand on the pseudocode, it'd look like:
def my_runtime_metada_fn(context, node_info):
run_results = context.resources.dbt.get_run_results_json()
unique_id = node_info["unique_id"]
for result_dict in run_results["results"]:
if result_dict["unique_id"] == unique_id:
return {"rows_affected": result_dict["adapter_response"]["rows_affected"]}
return {}
geoHeil
05/23/2023, 5:50 PMowen
05/23/2023, 5:51 PMgeoHeil
05/23/2023, 5:52 PMDagsterDbtCliOutputsNotFoundError: Expected to find file at path target/run_results.json'
Executing command: dbt --no-use-colors --log-format json run --project-dir resources/../../dbt --profiles-dir resources/../../dbt/config --target prod --select <<mymodel>>
And then the failure.owen
05/23/2023, 6:52 PMgeoHeil
05/23/2023, 6:54 PMowen
05/23/2023, 6:55 PMload_assets_from_dbt_manifest
. However, in this week's release we're adding a new dbt_assets
decorator that provides much more flexibility in how to handle events. This is still a work in progress / experimental, so things might shift around a bit, but here's a tentative example of how adding custom metadata might work: https://github.com/dagster-io/dagster/pull/14301/files#diff-acd3f930fb0f90b373feafce2bdaea26a238014bcb6d1dc757b1a578ced14cdbR10event
that's available at runtime, so it would be possible to parse out of theregeoHeil
05/23/2023, 6:58 PMowen
05/23/2023, 7:05 PMgeoHeil
05/26/2023, 9:13 AMrex
05/26/2023, 10:33 AMfrom typing import Optional
from dagster import OpExecutionContext, Output
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.asset_utils import output_name_fn
from dagster_dbt.cli import DbtCli, DbtManifest
from . import MANIFEST_PATH
manifest = DbtManifest.read(path=MANIFEST_PATH)
@dbt_assets(manifest=manifest)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCli):
dbt_cli_task = dbt.cli(["build"], manifest=manifest, context=context)
# Run the task, but don't yield events.
events = list(dbt_cli_task.stream())
# Get the run results after the task has completed.
run_results = dbt_cli_task.get_artifact("run_results.json")
results_by_output_name = {
output_name_fn({"unique_id": result["unique_id"]}): result
for result in run_results["results"]
}
# Then, we can use the run results to add metadata to the outputs.
for dagster_event in events:
if isinstance(dagster_event, Output):
output_name = dagster_event.output_name
result = results_by_output_name[output_name]
rows_affected: Optional[int] = result["adapter_response"].get("rows_affected")
rows_affected_metadata = {"rows_affected": rows_affected} if rows_affected else {}
context.add_output_metadata(
metadata={
**rows_affected_metadata,
},
output_name=output_name,
)
yield dagster_event
geoHeil
05/27/2023, 2:19 AMio_manager_key
specified in the new approach?dbt parse
good enough to get the desired manifest 2) if not now can I get this manifest file without connecting from CI to the prod warehouse instance? Also, in the past the target was branch aware. Will this still be true? Or do I explicitly have to take care myself when generating the manifest file?