Stephen Bailey
07/24/2023, 1:40 AMAdam Bloom
07/24/2023, 2:05 AMStephen Bailey
07/24/2023, 2:20 AMfor raw_event in dbt.cli(["run"]).stream_raw_events():
<http://context.log.info|context.log.info>(raw_event)
for dagster_event in raw_event.to_default_asset_events(manifest=DBT_MANIFEST):
if isinstance(dagster_event, Output):
# manually log asset materialization stuff
...
That unblocks us for now 💪 Still, would love for the stream
to be really all-inclusive.
yield from dbt.cli(["run"]).stream(yield_asset_materializations=True, log_raw_events=True)
geoHeil
07/24/2023, 10:27 AMStephen Bailey
07/24/2023, 11:42 AMgeoHeil
07/24/2023, 11:45 AMTimothee Vandeput
07/24/2023, 1:42 PMYou can configure an upload interval in your helm_chart or dagster.yaml
helm chart values example:
```computeLogs:
custom:
module: dagster_cloud.storage.compute_logs
class: CloudComputeLogManager
config:
upload_interval: 30```
dagster.yaml values example:
```compute_logs:
module: dagster_cloud.storage.compute_logs
class: CloudComputeLogManager
config:
upload_interval: 30```
There are some considerations for egress costs for setting a lower interval, on each interval dagster will re-upload the entire compute log
In the future we might roll out a low-egress, streaming by default solution but that has not yet shipped.
rex
07/24/2023, 1:45 PM.stream_raw_events()
), you could process them as you wish. dbt’s structure for these logs are given in https://docs.getdbt.com/reference/events-logging.
You could use the .data.info.node_info.node_status
field to determine whether a model has failed during the stream, and fire events accordingly (Slack events, processing, etc).Timothee Vandeput
07/24/2023, 1:47 PMSince you have access to the raw streaming logs from dbt (from), you could process them as you wish. dbt’s structure for these logs are given in https://docs.getdbt.com/reference/events-logging..stream_raw_events()
You could use the@geoHeil you have an example here. We are processing the event (stream) and accessing dbt artifacts at the end of the run.field to determine whether a model has failed during the stream, and fire events accordingly (Slack events, processing, etc)..data.info.node_info.node_status
geoHeil
07/24/2023, 3:14 PMevents = list(dbt_cli_task.stream_raw_events())
aas @rex explained to me to explicitly block the computation to then access the files later. The referenced example code is here: https://georgheiler.com/2023/06/13/unlocking-advanced-metadata-extraction-with-the-new-dbt-api-in-dagster/ is this blocking no longer neccessary?Timothee Vandeput
07/24/2023, 3:21 PMrex
07/24/2023, 7:14 PMAssetMaterialization
event, but it would be a separate entry in your asset catalog history.
After logging the initial asset materialization, it can’t be retroactively edited with newer information (e.g. rows_affected
from the metadata at the end of the run). So either:
• You log two AssetMaterialization
• You wait until all the metadata that you need to construct the AssetMaterialization
is present, and then you yield it with said metadataAndré Augusto
07/27/2023, 5:34 PMdef parse_dbt_raw_events(
context: OpExecutionContext,
raw_events: Iterator[DbtCliEventMessage],
manifest: Mapping[str, Any],
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
) -> Iterator:
"""Parse dbt cli raw events into dagster events.
How to use it:
yield from parse_dbt_raw_events(context, raw_events, ...)
"""
for event in raw_events:
msg = event.raw_event.get("info").get("msg") # type: ignore
level = event.raw_event.get("info").get("level") # type: ignore
if level == "info":
<http://context.log.info|context.log.info>(msg)
elif level == "warn":
context.log.warning(msg)
elif level == "error":
context.log.error(msg)
yield from event.to_default_asset_events(
manifest=manifest, dagster_dbt_translator=dagster_dbt_translator
)
Then, inside your assets function (decorated with the @dbt_assets), just use it as
def my_assets(context, config: MyDbtConfig, dbt: DbtCliResource)
raw_events = dbt.cli(
config.dbt_cli_args, manifest=my_dbt_manifest, context=context
).stream_raw_events()
yield from parse_dbt_raw_events(
context=context,
raw_events=raw_events,
manifest=my_dbt_manifest,
)
Timothee Vandeput
07/28/2023, 4:09 PMlogger.log(log_level, log_str)
instead of mapping each level. This is our snippet of code to also remove color characters from the log. We decided to override the stream method.
def stream(
self, yield_asset_materialization: bool = False, log_to_structured_events: bool = True
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation]]:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
for event in self.stream_raw_events():
if log_to_structured_events:
log_level = getLevelName(event.raw_event['info']['level'].upper())
log_str = ansi_escape.sub('', event.raw_event['info']['msg'])
logger.log(log_level, log_str)
yield from event.to_default_asset_events(
manifest=self.manifest,
dagster_dbt_translator=self.dagster_dbt_translator,
yield_asset_materialization=yield_asset_materialization,
)
André Augusto
07/28/2023, 4:17 PM--no-use-colors
when configuring the `DbtCliResource`to be used in the job:
DBT_RESOURCE = DbtCliResource(project_dir=DBT_DIR, global_config_flags=["--no-write-json", "--no-use-colors"])