Hi, I know that Dagster can pass data between soli...
# announcements
Hi, I know that Dagster can pass data between solid directly(to take benefits such as data validation, type checking, etc), It's mean that all of the data will be stored in RAM right? then If we need to deal with a large dataset(larger than RAM). How to deal with it?
you change the dagster type to something that acts as a “pointer” to the actual data you can still do data validation and type checking, but now those methods receive the “pointer” instead of the whole copy
hey @Krittipong Kanchanapiboon - how are you planning to do your data processing? e.g. pandas? pyspark? sql?
@alex Do you have any example code to do that?
Hi @sandy I'm working with csv files (read with pandas) and load to SQL. But in the future If i work with big data using pyspark, I know that data processing can be done in spark cluster but if i want to do data validation, How it would be?
Here's an example that uses Spark and does some type-checking and doesn't buffer anything in memory. If you want to do a quality check on the contents of your dataframe, then the dataframe will need to be staged somewhere - either via
to distribute it in memory across the Spark executors or stored somewhere. If you have specific ideas of how you want to do that, I'm happy to help figure out how they'd work.
Copy code
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from dagster import (
from dagster.core.storage.asset_store import AssetStore
import os

class LocalParquetStore(AssetStore):
    def _get_path(self, context, step_output_handle):
        return os.path.join(
            context.run_id, step_output_handle.step_key, step_output_handle.output_name,

    def set_asset(self, context, step_output_handle, obj, _):
        obj.write.parquet(self._get_path(context, step_output_handle))

    def get_asset(self, context, step_output_handle, _):
        spark = SparkSession.builder.getOrCreate()
        return spark.read.parquet()

def local_parquet_store(_):
    return LocalParquetStore()

PeopleSparkDataFrame = DagsterType(type_check_fn=lambda df: df.schema.names == ["name", "age"])

        OutputDefinition(asset_store_key="spark_asset_store", dagster_type=PeopleSparkDataFrame)
def make_people(context):
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    spark = SparkSession.builder.getOrCreate()
    return spark.createDataFrame(rows, schema)

def filter_over_50(_, people):
    return people.filter(people["age"] > 50)

def count_people(_, people) -> int:
    return people.count()

@pipeline(mode_defs=[ModeDefinition(resource_defs={"spark_asset_store": local_parquet_store})])
def my_pipeline():

def basic_pyspark_repo():
    return [my_pipeline]
🙏 1