Andrew
02/10/2023, 7:12 PMdagster._check.CheckError: Failure condition: No end time window found
This is a skeleton showing how I am doing this. I read a .xlsx file into pandas, then select from a dictionary of dataframes to get the one I want. I intend for this to become parquet.
monthly_partition_def = MonthlyPartitionsDefinition(
start_date=datetime(2023, 2, 1), timezone="America/New_York"
)
@asset(partitions_def=monthly_partition_def)
def missed_payments():
df = get_df()
return df
@asset(io_manager_key='parquet_io_manager', partitions_def=monthly_partition_def)
def missed_payments_national(missed_payments):
df = missed_payments.get('National')
return df
Below is the io_manager I am using (which worked find for multi-partitions)
class PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
obj.to_parquet(file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
with path.open("rb") as file:
return pd.read_parquet(file)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
init_context: InitResourceContext,
) -> PandasParquetIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PandasParquetIOManager(base_path=base_path)
How do I get the IO Manager to work as intended? (with static definitions, TimeWindows and multi-partitions) Sample projects, and the documentation are not pointing me in the right direction.