https://dagster.io/ logo
#announcements
Title
# announcements
k

Krittipong Kanchanapiboon

10/29/2020, 4:34 PM
Hi, I know that Dagster can pass data between solid directly(to take benefits such as data validation, type checking, etc), It's mean that all of the data will be stored in RAM right? then If we need to deal with a large dataset(larger than RAM). How to deal with it?
a

alex

10/29/2020, 4:37 PM
you change the dagster type to something that acts as a “pointer” to the actual data you can still do data validation and type checking, but now those methods receive the “pointer” instead of the whole copy
s

sandy

10/29/2020, 4:38 PM
hey @Krittipong Kanchanapiboon - how are you planning to do your data processing? e.g. pandas? pyspark? sql?
k

Krittipong Kanchanapiboon

10/30/2020, 5:08 AM
@alex Do you have any example code to do that?
Hi @sandy I'm working with csv files (read with pandas) and load to SQL. But in the future If i work with big data using pyspark, I know that data processing can be done in spark cluster but if i want to do data validation, How it would be?
s

sandy

10/30/2020, 9:49 PM
Here's an example that uses Spark and does some type-checking and doesn't buffer anything in memory. If you want to do a quality check on the contents of your dataframe, then the dataframe will need to be staged somewhere - either via
cache()
to distribute it in memory across the Spark executors or stored somewhere. If you have specific ideas of how you want to do that, I'm happy to help figure out how they'd work.
Copy code
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from dagster import (
    ModeDefinition,
    pipeline,
    repository,
    solid,
    resource,
    OutputDefinition,
    DagsterType,
)
from dagster.core.storage.asset_store import AssetStore
import os


class LocalParquetStore(AssetStore):
    def _get_path(self, context, step_output_handle):
        return os.path.join(
            context.run_id, step_output_handle.step_key, step_output_handle.output_name,
        )

    def set_asset(self, context, step_output_handle, obj, _):
        obj.write.parquet(self._get_path(context, step_output_handle))

    def get_asset(self, context, step_output_handle, _):
        spark = SparkSession.builder.getOrCreate()
        return spark.read.parquet()


@resource
def local_parquet_store(_):
    return LocalParquetStore()


PeopleSparkDataFrame = DagsterType(type_check_fn=lambda df: df.schema.names == ["name", "age"])


@solid(
    output_defs=[
        OutputDefinition(asset_store_key="spark_asset_store", dagster_type=PeopleSparkDataFrame)
    ]
)
def make_people(context):
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    spark = SparkSession.builder.getOrCreate()
    return spark.createDataFrame(rows, schema)


@solid(output_defs=[OutputDefinition(asset_store_key="spark_asset_store")])
def filter_over_50(_, people):
    return people.filter(people["age"] > 50)


@solid(output_defs=[OutputDefinition(asset_store_key="spark_asset_store")])
def count_people(_, people) -> int:
    return people.count()


@pipeline(mode_defs=[ModeDefinition(resource_defs={"spark_asset_store": local_parquet_store})])
def my_pipeline():
    count_people(filter_over_50(make_people()))

@repository
def basic_pyspark_repo():
    return [my_pipeline]
🙏 1