Le Yang
02/29/2024, 11:21 PMScout
02/29/2024, 11:21 PMpartition_expr
metadata on the asset to inform the Delta Lake I/O manager which column contains the partition data. This is necessary for the I/O manager to construct the correct queries to select or replace the data in the Delta table. Here's an example from the documentation on how to store static partitioned assets: python from dagster import asset @asset( # ... other parameters ... metadata={ "partition_expr": "my_partition_column" } ) def my_asset(): # ... asset computation ...
In this example, my_partition_column
would be the column in your DataFrame that defines the partition bounds. ### Filtering Partitions on Load When loading partitioned assets, the Delta Lake I/O manager can automatically filter loaded partitions. However, if you're using MultiPartitionsDefinition
, you are responsible for filtering the partitions in the downstream asset, as it's non-trivial for the I/O manager to do so. To select specific columns or filter partitions when loading an asset, you can supply metadata on the downstream asset. For example, to select specific columns: python from dagster import AssetIn, asset import pandas as pd @asset( ins={ "iris_sepal": AssetIn( key="iris_dataset", metadata={"columns": ["sepal_length_cm", "sepal_width_cm"]}, ) } ) def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame: # ... asset computation ...
In this example, only the sepal_length_cm
and sepal_width_cm
columns are fetched from the iris_dataset
asset. For filtering partitions, you would need to apply the filtering logic within your asset definition, as the I/O manager does not provide built-in filtering for MultiPartitionsDefinition
. Please note that the examples provided are based on the documentation, and you may need to adjust the code to fit your specific use case and data schema. If you have a more complex partitioning scheme or need to perform dynamic filtering, you may need to implement custom logic within your asset definitions or use additional metadata configurations.