https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Tyler Eason

03/06/2023, 5:43 PM
Hello, I have a question about
get_dagster_logger
Does it only work directly in an
op
? I can see my message using such a logger, but if I try to use the same utility in a custom
IOManager
they don't appear.
o

owen

03/07/2023, 7:23 PM
hi @Tyler Eason!
get_dagster_logger
should work within the body of a custom
IOManager
-- mind sharing your code?
t

Tyler Eason

03/08/2023, 1:34 PM
thanks @owen, gladly! There are some shenanigans going on from other classes that are hard to share, but overall the access pattern is pretty straightforward. The main
load_input
method calls the private
_load_input
method implemented by this class (and overwritten by its children), and the context gets pushed down. I know the code is getting executed because my asset materializes successfully, and if I get the pod logs I can see the
print
statements. Version is 1.1.20, deployed using helm to an autopilot GKE cluster.
Copy code
from dagster import IOManager, io_manager, get_dagster_logger

# read_write is a wrapper around some SQLAlchemy stuff
# AlembicUtils is another custom class that has tools for ensuring the target DB is ready
# I have commented out for convenience
from my_project.db_utils import AlembicUtils, read_write
from my_project.models.base import Base

logger = get_dagster_logger()


class PostgresIOManager(IOManager):
    # The layout of this class is so that its children can overwrite the private
    # methods and the Alembic checking function is still called
    def _handle_output(self, context, obj) -> None:
        logger.debug(obj)
        read_write.write_new(obj)

    def handle_output(self, context, obj) -> None:
        # AlembicUtils.check_remote_version()
        self._handle_output(context, obj)

    def _load_input(self, context) -> object:
        <http://logger.info|logger.info>(Base.get_model_from_name(context.name))
        logger.debug(f"Fetching model: {Base.get_model_from_name(context.name)} for partition {context.asset_partition_key}")
        print(f"Fetching model: {Base.get_model_from_name(context.name)} for partition {context.asset_partition_key}")
        return read_write.read_partitioned(
            Base.get_model_from_name(context.name),
            context.asset_partition_key
        )

    def load_input(self, context) -> object:
        # AlembicUtils.check_remote_version()
        return self._load_input(context)


@io_manager
def piom():
    return PostgresIOManager()
o

owen

03/08/2023, 9:28 PM
hm interesting -- I can look into this a bit more, but I think replacing your logger.info calls with context.log.info should solve your immediate problem (get_dagster_logger() is often just used as a convenience method for cases where you don't have the context object easily available)
t

Tyler Eason

04/02/2023, 3:37 PM
Sorry for the late reply Owen, using
<http://context.log.info|context.log.info>
did work for me, thank you!
2 Views