Ian Macleod
03/06/2024, 11:27 PMScout
03/06/2024, 11:27 PMlogging
library to define a custom logger that includes handlers for both Dagster and Datadog. For Datadog, you would use the Datadog-specific handler provided by the datadog
Python package. Here's an example of how you might define a custom logger that writes logs to both Dagster and Datadog. Please note that this example assumes you have the datadog
package installed and configured with your Datadog API key. python import logging from datadog import initialize, ThreadStats from dagster import logger # Initialize Datadog API options = { 'api_key': 'your_api_key', 'app_key': 'your_app_key' } initialize(**options) # Create a custom handler for Datadog class DatadogHandler(logging.Handler): def __init__(self): logging.Handler.__init__(self) self.stats = ThreadStats() self.stats.start() def emit(self, record): try: self.stats.event(title=record.message, text=record.getMessage(), alert_type='info') except Exception: self.handleError(record) # Define the custom logger for Dagster @logger def custom_logger(init_context): logger_ = logging.getLogger('dagster_datadog_logger') logger_.setLevel(<http://logging.INFO|logging.INFO>) # Add Dagster's console handler console_handler = logging.StreamHandler() console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_formatter) logger_.addHandler(console_handler) # Add Datadog handler datadog_handler = DatadogHandler() logger_.addHandler(datadog_handler) return logger_
In this example, we define a DatadogHandler
that sends logs to Datadog as events. We then create a custom logger using the @logger
decorator provided by Dagster. This custom logger adds both a console handler (for Dagster's logs) and the DatadogHandler
. Please ensure that you replace 'your_api_key'
and 'your_app_key'
with your actual Datadog API and application keys. Additionally, you may need to adjust the log level and formatter according to your specific requirements. Remember to attach this custom logger to your Dagster jobs or pipelines to ensure that logs are captured and sent to both Dagster and Datadog.Ian Macleod
03/07/2024, 12:27 AMdef make_json_logger(name: str, log_level: int = <http://logging.INFO|logging.INFO>) -> logging.Logger:
"""Create a JSON logger. This allows us to pass arbitrary key/value data in log messages.
It also puts stack traces in a single log message instead of spreading them across multiple log messages.
"""
if name is None or not isinstance(name, str) or len(name) == 0:
raise ValueError("Name must be a non-empty string.")
logger = logging.getLogger(name)
if any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers):
# logger already initialized
return logger
stream_handler = logging.StreamHandler()
in_kubernetes = os.getenv("KUBERNETES_SERVICE_HOST")
if in_kubernetes:
stream_handler.setFormatter(CustomJSONFormatter())
else:
# Reading JSON logs in your terminal is kinda hard, and you can't make use of the structured data
# benefits in your terminal anyway. So just fall back to the standard log format.
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(stream_handler)
logger.setLevel(log_level)
# Something is creating an extra handler
logger.propagate = (
False # Don't need to set to False as long as you don't also call logging.basicConfig()
)
# Want to make sure that unhandled exceptions get logged using the JSON logger. Otherwise,
# users will have to remember to wrap their main functions with:
#
# try:
# main()
# except Exception:
# logger.exception("blah")
#
# See: <https://stackoverflow.com/a/16993115/1729558>
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
return logger
Ian Macleod
03/07/2024, 12:28 AMdef make_json_logger(name: str, log_level: int = <http://logging.INFO|logging.INFO>) -> logging.Logger:
"""Create a JSON logger. This allows us to pass arbitrary key/value data in log messages.
It also puts stack traces in a single log message instead of spreading them across multiple log messages.
"""
if name is None or not isinstance(name, str) or len(name) == 0:
raise ValueError("Name must be a non-empty string.")
logger = logging.getLogger(name)
if any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers):
# logger already initialized
return logger
stream_handler = logging.StreamHandler()
in_kubernetes = os.getenv("KUBERNETES_SERVICE_HOST")
if in_kubernetes:
stream_handler.setFormatter(CustomJSONFormatter())
else:
# Reading JSON logs in your terminal is kinda hard, and you can't make use of the structured data
# benefits in your terminal anyway. So just fall back to the standard log format.
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(stream_handler)
logger.setLevel(log_level)
# Something is creating an extra handler
logger.propagate = (
False # Don't need to set to False as long as you don't also call logging.basicConfig()
)
# Want to make sure that unhandled exceptions get logged using the JSON logger. Otherwise,
# users will have to remember to wrap their main functions with:
#
# try:
# main()
# except Exception:
# logger.exception("blah")
#
# See: <https://stackoverflow.com/a/16993115/1729558>
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
return logger
Scout
03/07/2024, 12:28 AMmanaged_python_loggers
list in your dagster.yaml
configuration file. This will allow Dagster to capture logs from your custom logger and treat them the same as context.log
calls.
2. Use the get_dagster_logger
utility function to get a logger that is automatically captured by Dagster. This is useful when logging from inside nested functions or other cases where it would be inconvenient to thread through the context parameter to enable calls to context.log
.
Here's how you can modify your existing custom logging code to make it work with Dagster:
python
import logging
import os
import sys
from dagster import get_dagster_logger
# Your existing CustomJSONFormatter and LOG_FORMAT definitions should be here
def make_json_logger(name: str, log_level: int = <http://logging.INFO|logging.INFO>) -> logging.Logger:
"""Create a JSON logger. This allows us to pass arbitrary key/value data in log messages.
It also puts stack traces in a single log message instead of spreading them across multiple log messages.
"""
if name is None or not isinstance(name, str) or len(name) == 0:
raise ValueError("Name must be a non-empty string.")
# Use Dagster's get_dagster_logger utility to get a logger that is captured by Dagster
logger = get_dagster_logger()
if any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers):
# logger already initialized
return logger
stream_handler = logging.StreamHandler()
in_kubernetes = os.getenv("KUBERNETES_SERVICE_HOST")
if in_kubernetes:
stream_handler.setFormatter(CustomJSONFormatter())
else:
# Reading JSON logs in your terminal is kinda hard, and you can't make use of the structured data
# benefits in your terminal anyway. So just fall back to the standard log format.
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(stream_handler)
logger.setLevel(log_level)
logger.propagate = False
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
return logger
In your dagster.yaml
configuration file, you would add the following to manage your custom logger:
yaml
python_logs:
managed_python_loggers:
- my_custom_json_logger # Replace with the name you use when calling make_json_logger
By using `get_dagsScout
03/07/2024, 12:28 AMdagster.yaml
appropriately, you ensure that your custom logger's output is captured by Dagster's logging system and can be seen in the Dagster UI. Remember to replace 'my_custom_json_logger'
with the actual name you use when calling make_json_logger
.