geoHeil
10/05/2021, 4:04 PM<http://context.log.info|context.log.info>(os.path.join(context.run_id, context.step_key, context.name))
nor yield EventMetadataEntry.string(self._get_path(context), label="xxxxx")
seem to show up in the dagster logs.claire
10/05/2021, 5:42 PMgeoHeil
10/05/2021, 5:45 PMpython
class LocalParquetStore(IOManager):
def _get_path(self, context):
context.log.error('****')
<http://context.log.info|context.log.info>(os.path.join(context.run_id, context.step_key, context.name))
return os.path.join(context.run_id, context.step_key, context.name)
def handle_output(self, context, obj):
obj.write.parquet(self._get_path(context))
yield EventMetadataEntry.string(self._get_path(context), label="xxxxx")
I edited/added only the small log statements above
here the full file:
python
# start_repo_marker_0
import os
from dagster import IOManager, ModeDefinition, io_manager, pipeline, repository, solid
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
class LocalParquetStore(IOManager):
def _get_path(self, context):
context.log.error('****')
<http://context.log.info|context.log.info>(os.path.join(context.run_id, context.step_key, context.name))
return os.path.join(context.run_id, context.step_key, context.name)
def handle_output(self, context, obj):
obj.write.parquet(self._get_path(context))
yield EventMetadataEntry.string(self._get_path(context), label="xxxxx")
def load_input(self, context):
spark = SparkSession.builder.getOrCreate()
return spark.read.parquet(self._get_path(context.upstream_output))
@io_manager
def local_parquet_store(_):
return LocalParquetStore()
@solid
def make_people():
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
spark = SparkSession.builder.getOrCreate()
return spark.createDataFrame(rows, schema)
@solid
def filter_over_50(people):
return people.filter(people["age"] > 50)
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": local_parquet_store})])
def my_pipeline():
filter_over_50(make_people())
# end_repo_marker_0
@repository
def basic_pyspark_repo():
return [my_pipeline]
claire
10/05/2021, 5:52 PMcontext.log.error
log populates, can you confirm there are no other errors occurring in execution prior to the log call?geoHeil
10/05/2021, 5:53 PMclaire
10/05/2021, 9:22 PMassert False
in handle_output
geoHeil
10/06/2021, 3:59 AM