https://dagster.io/ logo
#dagster-support
Title
# dagster-support
g

Gowtham Manne

09/12/2022, 3:05 PM
Hi All, I am trying to format my logs using
Copy code
from dagster import op, job, repository, In,Out, OpExecutionContext, logger
from pydantic import Field
import logging
import json




@logger(
    {
        "log_level": Field(str, is_required=False, default_value="INFO"),
        "name": Field(str, is_required=False, default_value="dagster"),
    },
    description="A JSON-formatted console logger",
)
def json_console_logger(context : OpExecutionContext):
    level = context.logger_config["log_level"]
    name = context.logger_config["name"]

    klass = logging.getLoggerClass()
    logger_ = klass(name, level=level)

    handler = logging.StreamHandler()

    class JsonFormatter(logging.Formatter):
        def format(self, record):
            return json.dumps(record.__dict__)

    handler.setFormatter(JsonFormatter())
    logger_.addHandler(handler)

    return logger_




@op(
    name="MyCOperationInput",
    out={"result_str":Out(dagster_type=str)},
    config_schema={"name": str}
)
def my_C_op_input(context:OpExecutionContext):
    name = context.op_config["name"]
    <http://context.log.info|context.log.info>(f"My name is {name} in MyCOperationInput")
    return name


@op(
    name="MyCOperation",
    ins={"result_str":In()},
    out={"result":Out(dagster_type=str)}
)
def my_C_op(context:OpExecutionContext,result_str : str):
    print(context)
    print(result_str)
    <http://context.log.info|context.log.info>(f"My name is {result_str} in MyCOperation")
    return result_str

@job
def my_C_job():
    print("job started")
    my_C_op(my_C_op_input())

@repository(default_logger_defs={"json_logger": json_console_logger})
def my_C_repo():
    return [my_C_job]
but getting be error
dagster.core.errors.DagsterInvalidConfigDefinitionError: Error defining config. Original value passed: {'log_level': FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'INFO'}), 'name': FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'dagster'})}. Error at stack path :log_level. FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'INFO'}) cannot be resolved. This value can be a: - Field - Python primitive types that resolve to dagster config types - int, float, bool, str, list. - A dagster config type: Int, Float, Bool, Array, Optional, Selector, Shape, Permissive, Map - A bare python dictionary, which is wrapped in Field(Shape(...)). Any values in the dictionary get resolved by the same rules, recursively. - A python list with a single entry that can resolve to a type, e.g. [int]
z

Zach

09/12/2022, 3:09 PM
it looks like you're using
pydantic
Field
classes instead of Dagster's
Field
class, that could be an issue
g

Gowtham Manne

09/12/2022, 3:10 PM
My bad, its a typo. Using FastAPI somehow pydantic came out
@Zach Is there a way I can save this logs to a Database . Is there a native functionality in Dagster?
z

Zach P

09/12/2022, 3:21 PM
You can create your own logger for dagster to use. Should be relatively easy to setup if you can find a log handler for your DB
dagster bot responded by community 1
s

sean

09/12/2022, 5:19 PM
Gowtham I understand your original issue here was a code typo, and Zach directed you to the right docs for a logger-- anything else we can help you with?
g

Gowtham Manne

09/12/2022, 7:08 PM
@sean I want to write logs to mongoDB. Just wanted to check if there is a native method/function in Dagster to do so?
s

sean

09/12/2022, 8:05 PM
there is no native method for this-- you’ll need to define a custom logger
g

Gowtham Manne

09/13/2022, 3:24 AM
@sean Is below a working example from doc? I tried to the job and did not see the custom logs
Copy code
@logger(
    {
        "log_level": Field(str, is_required=False, default_value="INFO"),
        "name": Field(str, is_required=False, default_value="dagster"),
    },
    description="A JSON-formatted console logger",
)
def json_console_logger(init_context):
    level = init_context.logger_config["log_level"]
    name = init_context.logger_config["name"]

    klass = logging.getLoggerClass()
    logger_ = klass(name, level=level)

    handler = logging.StreamHandler()

    class JsonFormatter(logging.Formatter):
        def format(self, record):
            return json.dumps(record.__dict__)

    handler.setFormatter(JsonFormatter())
    logger_.addHandler(handler)

    return logger_


@op
def hello_logs(context):
    <http://context.log.info|context.log.info>("Hello, world!")


@job(logger_defs={"my_json_logger": json_console_logger})
def demo_job():
    hello_logs()
11 Views