https://dagster.io/ logo
Title
g

geoHeil

05/23/2023, 4:57 PM
How can I use the
runtime_metadata_fn
function to retrieve the number of created rows if DBT was materializing a table? The run_results.json in
results.adapter_response.rows_affected
seems to contain the desired value. How is this different to:
*node_info_to_definition_metadata_fn*
?
It looks like both methods only operate on the manifest.json
and not on the run_results.json file
How can I get the record count which only seems to be in run_results.json over as metadata?
o

owen

05/23/2023, 5:29 PM
hi @geoHeil! the
runtime_metadata_fn
takes two arguments, one being the context and the other being the node info. so, one option here would be to have your runtime_metadata_fn use the dbt resource available on the context to fetch the run_results.json object. This would be something like:
def my_runtime_metada_fn(context, node_info):
    run_results = context.resources.dbt.get_run_results_json()
    unique_id = node_info["unique_id"]
    # ... parse out rows_affected from adapter response
    return {"rows_affected": rows_affected}
we're working on making it easier to customize the behavior of dbt assets, and this sort of custom metadata use case is definitely something to improve on (as it's pretty unergonomic at the moment)
g

geoHeil

05/23/2023, 5:39 PM
How would this work (the assigment to an asset key and the metadata? DBT run is returning metadata for many assets I guess?
o

owen

05/23/2023, 5:39 PM
runtime_metadata_fn is invoked once for each output asset
g

geoHeil

05/23/2023, 5:46 PM
So for an example like this:
{
  "metadata": {
    "dbt_schema_version": "<https://schemas.getdbt.com/dbt/run-results/v4.json>",
    "dbt_version": "1.4.6",
    "generated_at": "2023-05-23T15:54:40.621481Z",
    "invocation_id": "e9e2e65a-1307-476c-b68c-ddf8f7c55128",
    "env": {}
  },
  "results": [
    {
      "status": "success",
      "timing": [
        {
          "name": "compile",
          "started_at": "2023-05-23T15:54:20.016828Z",
          "completed_at": "2023-05-23T15:54:20.021289Z"
        },
        {
          "name": "execute",
          "started_at": "2023-05-23T15:54:20.021870Z",
          "completed_at": "2023-05-23T15:54:40.500335Z"
        }
      ],
      "thread_id": "Thread-1 (worker)",
      "execution_time": 20.486584186553955,
      "adapter_response": {
        "_message": "SELECT 3196779",
        "code": "SELECT",
        "rows_affected": 3196779
      },
      "message": "SELECT 3196779",
      "failures": null,
      "unique_id": "<<<my_unique_id>>"
    }
  ],
  "elapsed_time": 21.433478832244873,
  "args": {
    "write_json": true,
    "use_colors": true,
    "printer_width": 80,
    "version_check": true,
    "partial_parse": true,
    "static_parser": true,
    "profiles_dir": "xxxx",
    "send_anonymous_usage_stats": true,
    "quiet": false,
    "no_print": false,
    "cache_selected_only": false,
    "target": "<<<<",
    "select": [
      "kkkk"
    ],
    "which": "run",
    "rpc_method": "run",
    "indirect_selection": "eager"
  }
}
Would dagster automatically parse the right asset key to each individual element of the
results
? If yes, how is dagsters asset key filtering to the right element in the results object? I would like to get the
adapter_response.rows_affected
for each result (or at leats for each result where the materialization is of type table). I have to admit, it is still unclear to me how the right filter/assigment between asset key and unique id would look like. Assuming asset key and uniqe ID are identical - would I need to (manually) search the array? However, most likely they are not identical (as the asset key is only constructed ) by feeding additional parameters (key_prefix, source_key_prefix) to the load assets form DBT function. So I would somehow need to unpack/unparse these as well?
o

owen

05/23/2023, 5:49 PM
the
node_info
argument to
runtime_metadata_fn
is the specific node info associated with a given asset key (so technically the asset key is not even an input to this function). To expand on the pseudocode, it'd look like:
def my_runtime_metada_fn(context, node_info):    
    run_results = context.resources.dbt.get_run_results_json()
    unique_id = node_info["unique_id"]
    
    for result_dict in run_results["results"]:
        if result_dict["unique_id"] == unique_id:
           return {"rows_affected": result_dict["adapter_response"]["rows_affected"]}
    return {}
g

geoHeil

05/23/2023, 5:50 PM
so you/dagster are already remapping the asset key to the unique ID?
o

owen

05/23/2023, 5:51 PM
yep exactly
g

geoHeil

05/23/2023, 5:52 PM
cool - let me try it and give it a shot
When trying this for real I run into either race conditions or am using it wrong:
DagsterDbtCliOutputsNotFoundError: Expected to find file at path target/run_results.json'
In the logs I can observe:
Executing command: dbt --no-use-colors --log-format json run --project-dir resources/../../dbt --profiles-dir resources/../../dbt/config --target prod --select <<mymodel>>
And then the failure.
o

owen

05/23/2023, 6:52 PM
ah I think I might know what's happening here -- are you on dbt-core version >= 1.4?
g

geoHeil

05/23/2023, 6:54 PM
should be 1.4.6
o

owen

05/23/2023, 6:55 PM
for those versions, we emit the outputs as the dbt command is running, rather than waiting for the command to complete, and so the run_results.json will not be available at that time (forgot about that quirk). if that's the case, then there's not a great existing solution with
load_assets_from_dbt_manifest
. However, in this week's release we're adding a new
dbt_assets
decorator that provides much more flexibility in how to handle events. This is still a work in progress / experimental, so things might shift around a bit, but here's a tentative example of how adding custom metadata might work: https://github.com/dagster-io/dagster/pull/14301/files#diff-acd3f930fb0f90b373feafce2bdaea26a238014bcb6d1dc757b1a578ced14cdbR10
in this case, the adapter response should be present in the
event
that's available at runtime, so it would be possible to parse out of there
g

geoHeil

05/23/2023, 6:58 PM
would it be more practical if I send the manifest file?
but the run-results would still only be available at the end
Do you think you could extend the tutorial/example to include how the affected row count can be extracted? It would be fine for me to wait until the release
:rainbow-daggy: 1
o

owen

05/23/2023, 7:05 PM
cc @rex
g

geoHeil

05/26/2023, 9:13 AM
@owen @rex https://dagster.slack.com/archives/C04CW71AGBW/p1685051912387029 the new version seems to be released now. Can you explain me in an example how this issue would be solved now with the new version?
r

rex

05/26/2023, 10:33 AM
We gave some examples in the repository. You can compose them to accomplish your use case. • This example, which fetches the artifact after the run is done • This example, which allows you to add metadata to an output
For example, this works in the jaffle shop repository (link)
from typing import Optional

from dagster import OpExecutionContext, Output
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.asset_utils import output_name_fn
from dagster_dbt.cli import DbtCli, DbtManifest

from . import MANIFEST_PATH

manifest = DbtManifest.read(path=MANIFEST_PATH)


@dbt_assets(manifest=manifest)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCli):
    dbt_cli_task = dbt.cli(["build"], manifest=manifest, context=context)

    # Run the task, but don't yield events.
    events = list(dbt_cli_task.stream())

    # Get the run results after the task has completed.
    run_results = dbt_cli_task.get_artifact("run_results.json")
    results_by_output_name = {
        output_name_fn({"unique_id": result["unique_id"]}): result
        for result in run_results["results"]
    }

    # Then, we can use the run results to add metadata to the outputs.
    for dagster_event in events:
        if isinstance(dagster_event, Output):
            output_name = dagster_event.output_name
            result = results_by_output_name[output_name]

            rows_affected: Optional[int] = result["adapter_response"].get("rows_affected")
            rows_affected_metadata = {"rows_affected": rows_affected} if rows_affected else {}

            context.add_output_metadata(
                metadata={
                    **rows_affected_metadata,
                },
                output_name=output_name,
            )

        yield dagster_event
g

geoHeil

05/27/2023, 2:19 AM
The @dbt_asset requires the manifest. Does this mean in the future you no longer support the dynamic generation of the maifest like: load_assets_from_dbt_project? Where is the
io_manager_key
specified in the new approach?
In case you want to get pre-created manifests: 1) is the
dbt parse
good enough to get the desired manifest 2) if not now can I get this manifest file without connecting from CI to the prod warehouse instance? Also, in the past the target was branch aware. Will this still be true? Or do I explicitly have to take care myself when generating the manifest file?