https://dagster.io/ logo
Title
g

geoHeil

10/05/2021, 4:04 PM
How can I add logging to the IO manager? https://github.com/dagster-io/dagster/blob/master/examples/basic_pyspark_crag/repo.py#L11 i.e. neither:
<http://context.log.info|context.log.info>(os.path.join(context.run_id, context.step_key, context.name))
nor
yield EventMetadataEntry.string(self._get_path(context), label="xxxxx")
seem to show up in the dagster logs.
c

claire

10/05/2021, 5:42 PM
Hi @geoHeil, can you please provide your code containing the IO manager so I can try to reproduce this?
g

geoHeil

10/05/2021, 5:45 PM
sure:
python
class LocalParquetStore(IOManager):
    def _get_path(self, context):
        context.log.error('****')
        <http://context.log.info|context.log.info>(os.path.join(context.run_id, context.step_key, context.name))

        return os.path.join(context.run_id, context.step_key, context.name)

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))
        yield EventMetadataEntry.string(self._get_path(context), label="xxxxx")
I edited/added only the small log statements above here the full file:
python
# start_repo_marker_0
import os

from dagster import IOManager, ModeDefinition, io_manager, pipeline, repository, solid
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


class LocalParquetStore(IOManager):
    def _get_path(self, context):
        context.log.error('****')
        <http://context.log.info|context.log.info>(os.path.join(context.run_id, context.step_key, context.name))

        return os.path.join(context.run_id, context.step_key, context.name)

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))
        yield EventMetadataEntry.string(self._get_path(context), label="xxxxx")

    def load_input(self, context):
        spark = SparkSession.builder.getOrCreate()
        return spark.read.parquet(self._get_path(context.upstream_output))


@io_manager
def local_parquet_store(_):
    return LocalParquetStore()


@solid
def make_people():
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    spark = SparkSession.builder.getOrCreate()
    return spark.createDataFrame(rows, schema)


@solid
def filter_over_50(people):
    return people.filter(people["age"] > 50)


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": local_parquet_store})])
def my_pipeline():
    filter_over_50(make_people())


# end_repo_marker_0


@repository
def basic_pyspark_repo():
    return [my_pipeline]
my dagster version is 0.12.13 (current latest release)
c

claire

10/05/2021, 5:52 PM
When I execute the same code the
context.log.error
log populates, can you confirm there are no other errors occurring in execution prior to the log call?
yes - I see no error messages.
c

claire

10/05/2021, 9:22 PM
Hey @geoHeil, is it possible that the modified code is not being executed? As a sanity check, try throwing an
assert False
in
handle_output
g

geoHeil

10/06/2021, 3:59 AM
indeed thx. I was accidentally modifying the file from basic_pyspark but executing basic_pyspark_crag
thx