https://dagster.io/ logo
#ask-ai
Title
# ask-ai
i

Ian Macleod

03/06/2024, 11:27 PM
@Scout can you give me a python logging class using the logger library that is compatible with both dagster and datadog? I need all of my logs to be written to both dagster and datadog
s

Scout

03/06/2024, 11:27 PM
We can create a custom logger in Dagster that is compatible with both Dagster's logging system and Datadog. To achieve this, you would typically use Python's
logging
library to define a custom logger that includes handlers for both Dagster and Datadog. For Datadog, you would use the Datadog-specific handler provided by the
datadog
Python package. Here's an example of how you might define a custom logger that writes logs to both Dagster and Datadog. Please note that this example assumes you have the
datadog
package installed and configured with your Datadog API key.
Copy code
python import logging from datadog import initialize, ThreadStats from dagster import logger  # Initialize Datadog API options = {     'api_key': 'your_api_key',     'app_key': 'your_app_key' } initialize(**options)  # Create a custom handler for Datadog class DatadogHandler(logging.Handler):     def __init__(self):         logging.Handler.__init__(self)         self.stats = ThreadStats()         self.stats.start()      def emit(self, record):         try:             self.stats.event(title=record.message, text=record.getMessage(), alert_type='info')         except Exception:             self.handleError(record)  # Define the custom logger for Dagster @logger def custom_logger(init_context):     logger_ = logging.getLogger('dagster_datadog_logger')     logger_.setLevel(<http://logging.INFO|logging.INFO>)      # Add Dagster's console handler     console_handler = logging.StreamHandler()     console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')     console_handler.setFormatter(console_formatter)     logger_.addHandler(console_handler)      # Add Datadog handler     datadog_handler = DatadogHandler()     logger_.addHandler(datadog_handler)      return logger_
In this example, we define a
DatadogHandler
that sends logs to Datadog as events. We then create a custom logger using the
@logger
decorator provided by Dagster. This custom logger adds both a console handler (for Dagster's logs) and the
DatadogHandler
. Please ensure that you replace
'your_api_key'
and
'your_app_key'
with your actual Datadog API and application keys. Additionally, you may need to adjust the log level and formatter according to your specific requirements. Remember to attach this custom logger to your Dagster jobs or pipelines to ensure that logs are captured and sent to both Dagster and Datadog.
i

Ian Macleod

03/07/2024, 12:27 AM
Here's some custom logging code I already have, how can I make it work with dagster to emit logs to dagster as well?
Copy code
def make_json_logger(name: str, log_level: int = <http://logging.INFO|logging.INFO>) -> logging.Logger:
    """Create a JSON logger. This allows us to pass arbitrary key/value data in log messages.
    It also puts stack traces in a single log message instead of spreading them across multiple log messages.
    """
    if name is None or not isinstance(name, str) or len(name) == 0:
        raise ValueError("Name must be a non-empty string.")

    logger = logging.getLogger(name)
    if any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers):
        # logger already initialized
        return logger

    stream_handler = logging.StreamHandler()
    in_kubernetes = os.getenv("KUBERNETES_SERVICE_HOST")
    if in_kubernetes:
        stream_handler.setFormatter(CustomJSONFormatter())
    else:
        # Reading JSON logs in your terminal is kinda hard, and you can't make use of the structured data
        # benefits in your terminal anyway. So just fall back to the standard log format.
        stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))

    logger.addHandler(stream_handler)
    logger.setLevel(log_level)

    # Something is creating an extra handler
    logger.propagate = (
        False  # Don't need to set to False as long as you don't also call logging.basicConfig()
    )

    # Want to make sure that unhandled exceptions get logged using the JSON logger. Otherwise,
    # users will have to remember to wrap their main functions with:
    #
    # try:
    #     main()
    # except Exception:
    #     logger.exception("blah")
    #
    # See: <https://stackoverflow.com/a/16993115/1729558>
    def handle_exception(exc_type, exc_value, exc_traceback):
        if issubclass(exc_type, KeyboardInterrupt):
            sys.__excepthook__(exc_type, exc_value, exc_traceback)
            return

        logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))

    sys.excepthook = handle_exception
    return logger
@Scout Here's some custom logging code I already have, how can I make it work with dagster to emit logs to dagster as well?
Copy code
def make_json_logger(name: str, log_level: int = <http://logging.INFO|logging.INFO>) -> logging.Logger:
    """Create a JSON logger. This allows us to pass arbitrary key/value data in log messages.
    It also puts stack traces in a single log message instead of spreading them across multiple log messages.
    """
    if name is None or not isinstance(name, str) or len(name) == 0:
        raise ValueError("Name must be a non-empty string.")

    logger = logging.getLogger(name)
    if any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers):
        # logger already initialized
        return logger

    stream_handler = logging.StreamHandler()
    in_kubernetes = os.getenv("KUBERNETES_SERVICE_HOST")
    if in_kubernetes:
        stream_handler.setFormatter(CustomJSONFormatter())
    else:
        # Reading JSON logs in your terminal is kinda hard, and you can't make use of the structured data
        # benefits in your terminal anyway. So just fall back to the standard log format.
        stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))

    logger.addHandler(stream_handler)
    logger.setLevel(log_level)

    # Something is creating an extra handler
    logger.propagate = (
        False  # Don't need to set to False as long as you don't also call logging.basicConfig()
    )

    # Want to make sure that unhandled exceptions get logged using the JSON logger. Otherwise,
    # users will have to remember to wrap their main functions with:
    #
    # try:
    #     main()
    # except Exception:
    #     logger.exception("blah")
    #
    # See: <https://stackoverflow.com/a/16993115/1729558>
    def handle_exception(exc_type, exc_value, exc_traceback):
        if issubclass(exc_type, KeyboardInterrupt):
            sys.__excepthook__(exc_type, exc_value, exc_traceback)
            return

        logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))

    sys.excepthook = handle_exception
    return logger
s

Scout

03/07/2024, 12:28 AM
To integrate your custom JSON logger with Dagster and ensure that logs are emitted to Dagster as well, you can follow these steps: 1. Configure Dagster to manage your custom logger by adding it to the
managed_python_loggers
list in your
dagster.yaml
configuration file. This will allow Dagster to capture logs from your custom logger and treat them the same as
context.log
calls. 2. Use the
get_dagster_logger
utility function to get a logger that is automatically captured by Dagster. This is useful when logging from inside nested functions or other cases where it would be inconvenient to thread through the context parameter to enable calls to
context.log
. Here's how you can modify your existing custom logging code to make it work with Dagster:
Copy code
python
import logging
import os
import sys
from dagster import get_dagster_logger

# Your existing CustomJSONFormatter and LOG_FORMAT definitions should be here

def make_json_logger(name: str, log_level: int = <http://logging.INFO|logging.INFO>) -> logging.Logger:
    """Create a JSON logger. This allows us to pass arbitrary key/value data in log messages.
    It also puts stack traces in a single log message instead of spreading them across multiple log messages.
    """
    if name is None or not isinstance(name, str) or len(name) == 0:
        raise ValueError("Name must be a non-empty string.")

    # Use Dagster's get_dagster_logger utility to get a logger that is captured by Dagster
    logger = get_dagster_logger()

    if any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers):
        # logger already initialized
        return logger

    stream_handler = logging.StreamHandler()
    in_kubernetes = os.getenv("KUBERNETES_SERVICE_HOST")
    if in_kubernetes:
        stream_handler.setFormatter(CustomJSONFormatter())
    else:
        # Reading JSON logs in your terminal is kinda hard, and you can't make use of the structured data
        # benefits in your terminal anyway. So just fall back to the standard log format.
        stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))

    logger.addHandler(stream_handler)
    logger.setLevel(log_level)

    logger.propagate = False

    def handle_exception(exc_type, exc_value, exc_traceback):
        if issubclass(exc_type, KeyboardInterrupt):
            sys.__excepthook__(exc_type, exc_value, exc_traceback)
            return

        logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))

    sys.excepthook = handle_exception
    return logger
In your
dagster.yaml
configuration file, you would add the following to manage your custom logger:
Copy code
yaml
python_logs:
  managed_python_loggers:
    - my_custom_json_logger  # Replace with the name you use when calling make_json_logger
By using `get_dags
ter_logger` and configuring
dagster.yaml
appropriately, you ensure that your custom logger's output is captured by Dagster's logging system and can be seen in the Dagster UI. Remember to replace
'my_custom_json_logger'
with the actual name you use when calling
make_json_logger
.