https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Andrew

02/10/2023, 7:12 PM
Hi - I'm getting an error I can't figure out when attempting to materialize a downstream asset using a parquet_io manager.
Copy code
dagster._check.CheckError: Failure condition: No end time window found
This is a skeleton showing how I am doing this. I read a .xlsx file into pandas, then select from a dictionary of dataframes to get the one I want. I intend for this to become parquet.
Copy code
monthly_partition_def = MonthlyPartitionsDefinition(
    start_date=datetime(2023, 2, 1), timezone="America/New_York"
)

@asset(partitions_def=monthly_partition_def)
def missed_payments():
df = get_df()
return df

@asset(io_manager_key='parquet_io_manager',  partitions_def=monthly_partition_def)
def missed_payments_national(missed_payments):
    df = missed_payments.get('National')
    return df
Below is the io_manager I am using (which worked find for multi-partitions)
Copy code
class PandasParquetIOManager(UPathIOManager):
    extension: str = ".parquet"

    def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.to_parquet(file)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        with path.open("rb") as file:
            return pd.read_parquet(file)
        
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
    init_context: InitResourceContext,
) -> PandasParquetIOManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return PandasParquetIOManager(base_path=base_path)
How do I get the IO Manager to work as intended? (with static definitions, TimeWindows and multi-partitions) Sample projects, and the documentation are not pointing me in the right direction.
11 Views