Daniel Gafni
05/11/2023, 3:55 PMA
, B
share the same daily partitions; C
is also daily partitioned, but has different partitions (starting 1 day later). It's mapped into the same partition from A
and 2 previous partitions from B
(that’s why it’s partitions start 1 day later). TimeWindowPartitionMapping
is used to map it to 2 previous partitions.
A
and B
belong to code location 1, C
belongs to 2.
There is a problem: TimeWindowPartitionMapping
fails with this error:
dagster._core.errors.DagsterInvalidDefinitionError: Can't use the start_offset or end_offset parameters of TimeWindowPartitionMapping when the cron schedule of the upstream PartitionsDefinition is different than the cron schedule of the downstream one.
File "/usr/local/lib/python3.8/site-packages/dagster_graphql/implementation/utils.py", line 126, in _fn
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster_graphql/implementation/utils.py", line 57, in _fn
result = fn(self, graphene_info, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster_graphql/schema/roots/mutation.py", line 299, in mutate
return create_and_launch_partition_backfill(graphene_info, backfillParams)
File "/usr/local/lib/python3.8/site-packages/dagster_graphql/implementation/execution/backfill.py", line 155, in create_and_launch_partition_backfill
backfill = PartitionBackfill.from_asset_partitions(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/backfill.py", line 341, in from_asset_partitions
serialized_asset_backfill_data=AssetBackfillData.from_asset_partitions(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/asset_backfill.py", line 352, in from_asset_partitions
target_subset |= asset_graph.bfs_filter_subsets(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_graph.py", line 469, in bfs_filter_subsets
partition_mapping.get_downstream_partitions_for_partitions(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 133, in get_downstream_partitions_for_partitions
return self._map_partitions(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 159, in _map_partitions
raise DagsterInvalidDefinitionError(
when launching a backfill for B
. I can't backfill the asset. I can run individual runs tho.
Clearly this check is in the wrong place as I'm not selecting the C
asset for the backfill, only B
, so no PartitionsMapping
shoud get executed.
I've had my own custom PartitionsMapping
previously which of course didn't have this check and worked without any problems.
@owen here is the postowen
05/11/2023, 4:10 PMDaniel Gafni
05/11/2023, 4:18 PMA_partitions = DailyPartitionsDefinition(
start_date=datetime(2021, 8, 23), hour_offset=5, minute_offset=1
)
@asset(
partitions_def=A_partitions,
io_manager_key="polars_parquet_io_manager",
)
def A(context: OpExecutionContext) -> pl.DataFrame:
...
B_partitions = DailyPartitionsDefinition(
start_date=datetime(2021, 8, 23), hour_offset=5, minute_offset=1
)
@asset(
partitions_def=B_partitions,
io_manager_key="polars_parquet_io_manager",
)
def B(context: OpExecutionContext) -> pl.DataFrame:
...
@asset(
partitions_def=DailyPartitionsDefinition(
start_date=B_partitions.start + timedelta(days=1), hour_offset=5, minute_offset=1
),
io_manager_key="polars_parquet_io_manager",
ins={
"B_partitions": AssetIn(
key=B_key,
partition_mapping=TimeWindowPartitionMapping(start_offset=-1),
),
"A": AssetIn(key=A_key),
},
)
def C(config: CConfig, A: pl.DataFrame, B_partitions: dict[str, pl.DataFrame]) -> pl.DataFrame:
...
Daniel Gafni
05/11/2023, 4:20 PMowen
05/11/2023, 4:31 PMB
works as expected, as does running all the assets in a single backfill)
this is definitely a very strange error (as of course the cron schedules should be identical here) -- when does this come up? is it at the moment you press "Launch N-run backfill"? or is it after the backfill is kicked off?
and are there other assets downstream of B? any non-daily-partitioned assets in general?Daniel Gafni
05/11/2023, 4:33 PMB
- D
, it's daily partitioned and also mapped with TimeWindowPartitionMapping
(also has different partitions).