Made <an issue> around the move from logging to th...
# integration-dbt
s
Made an issue around the move from logging to the event log stream vs raw compute logs. As far as I can tell, on our Dagster CLoud / EKS deployments, the logs don't get streamed back to Dagster Cloud until the completion of the run, which really impedes our ability to monitor job progress
a
There’s some discussion in this thread you may find useful. It’s been a bit since I looked at it, but I believe you could implement this yourself quite easily from the interface
👍 2
s
Oh, very nice! So, per these lines, it looks like I can do:
Copy code
for raw_event in dbt.cli(["run"]).stream_raw_events():
    <http://context.log.info|context.log.info>(raw_event)
    for dagster_event in raw_event.to_default_asset_events(manifest=DBT_MANIFEST):
        if isinstance(dagster_event, Output):
            # manually log asset materialization stuff
            ...
That unblocks us for now 💪 Still, would love for the
stream
to be really all-inclusive.
Copy code
yield from dbt.cli(["run"]).stream(yield_asset_materializations=True, log_raw_events=True)
g
Interestingly, I had to block the stream as I needed to fiddle with the generated manifest with other tools. It would be nice if I would not have to block the stream but could then afterwards handle the manifest files (and parse the desired metadata)
s
Are you handling the files after the cli invocation? We are parsing the run results and sending Slack messages to team channels at the conclusion of the run. (although actually -- I wonder if we could do this during the stream too 👀 )
g
After. Exactly
t
@Stephen Bailey, I had the same question to Dagster's support and there is also the possibility to ask your agent to push logs to Dagster Cloud during task execution. Here is a snippet of our exchange :
You can configure an upload interval in your helm_chart or dagster.yaml
helm chart values example:
```computeLogs:
custom:
module: dagster_cloud.storage.compute_logs
class: CloudComputeLogManager
config:
upload_interval: 30```
dagster.yaml values example:
```compute_logs:
module: dagster_cloud.storage.compute_logs
class: CloudComputeLogManager
config:
upload_interval: 30```
There are some considerations for egress costs for setting a lower interval, on each interval dagster will re-upload the entire compute log
In the future we might roll out a low-egress, streaming by default solution but that has not yet shipped.
👍🏽 1
👍 1
r
re: parsing run results during the stream: Since you have access to the raw streaming logs from dbt (from
.stream_raw_events()
), you could process them as you wish. dbt’s structure for these logs are given in https://docs.getdbt.com/reference/events-logging. You could use the
.data.info.node_info.node_status
field to determine whether a model has failed during the stream, and fire events accordingly (Slack events, processing, etc).
t
Since you have access to the raw streaming logs from dbt (from
.stream_raw_events()
), you could process them as you wish. dbt’s structure for these logs are given in https://docs.getdbt.com/reference/events-logging.
You could use the
.data.info.node_info.node_status
field to determine whether a model has failed during the stream, and fire events accordingly (Slack events, processing, etc).
@geoHeil you have an example here. We are processing the event (stream) and accessing dbt artifacts at the end of the run.
g
@Timothee Vandeput I had to use
Copy code
events = list(dbt_cli_task.stream_raw_events())
aas @rex explained to me to explicitly block the computation to then access the files later. The referenced example code is here: https://georgheiler.com/2023/06/13/unlocking-advanced-metadata-extraction-with-the-new-dbt-api-in-dagster/ is this blocking no longer neccessary?
t
@geoHeil I understand ... you are using some metadata from DBT (ie. adapter_response) that are not streamed by DBT but only accessible at the end of the run. So if you want to add those metadatas, you have to block until the end of the run and then generate the outputs. I was also looking into getting the _rows_affected,_ it's a pity it's not available during the run. In our case, we prefer to stream the events during the run and not have that specific metadata. @rex Is there a way to attach additionnal metadata after the output has been yielded ?
r
You could emit a separate
AssetMaterialization
event, but it would be a separate entry in your asset catalog history. After logging the initial asset materialization, it can’t be retroactively edited with newer information (e.g.
rows_affected
from the metadata at the end of the run). So either: • You log two
AssetMaterialization
• You wait until all the metadata that you need to construct the
AssetMaterialization
is present, and then you yield it with said metadata
👍 1
a
I ended up wanting to use the event logs instead of the compute logs (as I commend earlier in the RFC) and using the code Stephen provided as base I got something that I’m quite happy about it:
Copy code
def parse_dbt_raw_events(
    context: OpExecutionContext,
    raw_events: Iterator[DbtCliEventMessage],
    manifest: Mapping[str, Any],
    dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
) -> Iterator:
    """Parse dbt cli raw events into dagster events.

    How to use it:
yield from parse_dbt_raw_events(context, raw_events, ...)
Copy code
"""
    for event in raw_events:
        msg = event.raw_event.get("info").get("msg")  # type: ignore
        level = event.raw_event.get("info").get("level")  # type: ignore
        if level == "info":
            <http://context.log.info|context.log.info>(msg)
        elif level == "warn":
            context.log.warning(msg)
        elif level == "error":
            context.log.error(msg)

        yield from event.to_default_asset_events(
            manifest=manifest, dagster_dbt_translator=dagster_dbt_translator
        )
Then, inside your assets function (decorated with the @dbt_assets), just use it as
Copy code
def my_assets(context, config: MyDbtConfig, dbt: DbtCliResource)     
        raw_events = dbt.cli(
            config.dbt_cli_args, manifest=my_dbt_manifest, context=context
        ).stream_raw_events()

        yield from parse_dbt_raw_events(
            context=context,
            raw_events=raw_events,
            manifest=my_dbt_manifest,
        )
t
@André Augusto You could use
logger.log(log_level, log_str)
instead of mapping each level. This is our snippet of code to also remove color characters from the log. We decided to override the stream method.
Copy code
def stream(
        self, yield_asset_materialization: bool = False, log_to_structured_events: bool = True
    ) -> Iterator[Union[Output, AssetMaterialization, AssetObservation]]:
        ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
        for event in self.stream_raw_events():
            if log_to_structured_events:
                log_level = getLevelName(event.raw_event['info']['level'].upper())
                log_str = ansi_escape.sub('', event.raw_event['info']['msg'])
                logger.log(log_level, log_str)

            yield from event.to_default_asset_events(
                manifest=self.manifest,
                dagster_dbt_translator=self.dagster_dbt_translator,
                yield_asset_materialization=yield_asset_materialization,
            )
a
nice, didn’t know about that. For the color characters, I ended up using the
--no-use-colors
when configuring the `DbtCliResource`to be used in the job:
Copy code
DBT_RESOURCE = DbtCliResource(project_dir=DBT_DIR, global_config_flags=["--no-write-json", "--no-use-colors"])
👌 1