Gowtham Manne
09/12/2022, 3:05 PMfrom dagster import op, job, repository, In,Out, OpExecutionContext, logger
from pydantic import Field
import logging
import json
@logger(
{
"log_level": Field(str, is_required=False, default_value="INFO"),
"name": Field(str, is_required=False, default_value="dagster"),
},
description="A JSON-formatted console logger",
)
def json_console_logger(context : OpExecutionContext):
level = context.logger_config["log_level"]
name = context.logger_config["name"]
klass = logging.getLoggerClass()
logger_ = klass(name, level=level)
handler = logging.StreamHandler()
class JsonFormatter(logging.Formatter):
def format(self, record):
return json.dumps(record.__dict__)
handler.setFormatter(JsonFormatter())
logger_.addHandler(handler)
return logger_
@op(
name="MyCOperationInput",
out={"result_str":Out(dagster_type=str)},
config_schema={"name": str}
)
def my_C_op_input(context:OpExecutionContext):
name = context.op_config["name"]
<http://context.log.info|context.log.info>(f"My name is {name} in MyCOperationInput")
return name
@op(
name="MyCOperation",
ins={"result_str":In()},
out={"result":Out(dagster_type=str)}
)
def my_C_op(context:OpExecutionContext,result_str : str):
print(context)
print(result_str)
<http://context.log.info|context.log.info>(f"My name is {result_str} in MyCOperation")
return result_str
@job
def my_C_job():
print("job started")
my_C_op(my_C_op_input())
@repository(default_logger_defs={"json_logger": json_console_logger})
def my_C_repo():
return [my_C_job]
but getting be error dagster.core.errors.DagsterInvalidConfigDefinitionError: Error defining config. Original value passed: {'log_level': FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'INFO'}), 'name': FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'dagster'})}. Error at stack path :log_level. FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'INFO'}) cannot be resolved. This value can be a: - Field - Python primitive types that resolve to dagster config types - int, float, bool, str, list. - A dagster config type: Int, Float, Bool, Array, Optional, Selector, Shape, Permissive, Map - A bare python dictionary, which is wrapped in Field(Shape(...)). Any values in the dictionary get resolved by the same rules, recursively. - A python list with a single entry that can resolve to a type, e.g. [int]
Zach
09/12/2022, 3:09 PMpydantic
Field
classes instead of Dagster's Field
class, that could be an issueGowtham Manne
09/12/2022, 3:10 PMZach P
09/12/2022, 3:21 PMsean
09/12/2022, 5:19 PMGowtham Manne
09/12/2022, 7:08 PMsean
09/12/2022, 8:05 PMGowtham Manne
09/13/2022, 3:24 AM@logger(
{
"log_level": Field(str, is_required=False, default_value="INFO"),
"name": Field(str, is_required=False, default_value="dagster"),
},
description="A JSON-formatted console logger",
)
def json_console_logger(init_context):
level = init_context.logger_config["log_level"]
name = init_context.logger_config["name"]
klass = logging.getLoggerClass()
logger_ = klass(name, level=level)
handler = logging.StreamHandler()
class JsonFormatter(logging.Formatter):
def format(self, record):
return json.dumps(record.__dict__)
handler.setFormatter(JsonFormatter())
logger_.addHandler(handler)
return logger_
@op
def hello_logs(context):
<http://context.log.info|context.log.info>("Hello, world!")
@job(logger_defs={"my_json_logger": json_console_logger})
def demo_job():
hello_logs()