James Hale
10/28/2022, 8:20 PMDaniel Gafni
10/29/2022, 5:46 PMJames Hale
11/04/2022, 3:20 AMDaniel Gafni
11/04/2022, 3:18 PMUPathIOManager
which can do it for filesystem data, it's easy to subclass it)
2. You have to define a custom PartitionMapping
class
3. You need to define an upstream asset which would be daily partitioned
4. You need to define a downstream asset which would be daily partitioned and use your PartitionMapping class to load 3 partitions from the upstream asset.
Here is a piece of my old code. You should make something similar.
from dagster.core.definitions.partition_key_range import PartitionKeyRange
from dagster.core.definitions.partition_mapping import PartitionMapping
class NDaysPartitionMapping(PartitionMapping):
def __init__(self, days: int, offset: int = 0):
self.days = days
self.offset = offset
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def: PartitionsDefinition,
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
assert isinstance(upstream_partitions_def, DailyPartitionsDefinition)
return upstream_partition_key_range
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def: PartitionsDefinition, # pylint: disable=unused-argument
upstream_partitions_def: PartitionsDefinition, # pylint: disable=unused-argument
) -> PartitionKeyRange:
mapped_range = PartitionKeyRange(
start=(
datetime.strptime(downstream_partition_key_range.start, "%Y-%m-%d")
- timedelta(days=self.days)
- timedelta(days=self.offset)
).strftime("%Y-%m-%d"),
end=(
datetime.strptime(downstream_partition_key_range.end, "%Y-%m-%d") - timedelta(days=self.offset)
).strftime("%Y-%m-%d"),
)
return mapped_range
raw_partitions = DailyPartitionsDefinition(start_date=str(raw_start_date), fmt="%Y-%m-%d", timezone=timezone)
@asset(
partitions_def=raw_partitions,
io_manager_key="parquet_io_manager",
required_resource_keys={"parquet_io_manager"},
)
def upstream(context: OpExecutionContext) -> pd.DataFrame:
return pd.DataFrame(your_data)
@asset(
partitions_def=raw_partitions,
partition_mappings={"upstream": NDaysPartitionMapping(days=3)},
io_manager_key="parquet_io_manager",
required_resource_keys={"parquet_io_manager"},
)
def downstream(context: OpExecutionContext, upstream: List[pd.DataFrame]) -> pd.DataFrame:
return pd.concat(upstream)
json
with the UPathIOManager
(it's a little different from my code above because it output's a dict of partitions, not a list):
from dagster._core.storage.upath_io_manager import UPathIOManager
class JSONIOManager(UPathIOManager):
extension: str = ".json"
def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
with path.open("w") as file:
json.dump(obj, file)
# file.write(json.dumps(obj).encode())
def load_from_path(self, context: InputContext, path: UPath) -> Any:
with path.open("r") as file:
return json.load(file)
nickvazz
12/09/2022, 1:02 AMDaniel Gafni
12/09/2022, 7:35 AMnickvazz
12/09/2022, 4:55 PMDaniel Gafni
12/09/2022, 5:16 PMnickvazz
12/09/2022, 5:19 PMbuild_context
etc to generate/debug still, will report back!