Zeeshan Nehal
07/25/2023, 4:00 AMDynamicPartitionsDefinition
for assets, in my case, I am using MultiPartitionsDefinition
which consist of DynamicPartitionsDefinition
which pull a list of ids from db on run-time, and another DailyPartitionsDefinition
to execute asset daily based on date partition. It is strange using the dagester's sensor that I am able to see dynamic partition executing new ids being pulled from DB, but not removing the previous ids partition that was deleted. If I try to run manually using UI to materialize it gives the list of all historical dyanmic partitioned values (ids). In the second part I noticed that I cannot use freshness policy for ``DynamicPartitionsDefinition\, MultiPartitionsDefinition`sandy
07/25/2023, 3:52 PMwhich consist ofwhen you say "on run-time", what exactly do you mean? and how are you deleting the previous ids?which pull a list of ids from db on run-timeDynamicPartitionsDefinition
Zeeshan Nehal
07/26/2023, 12:45 AMget_fp_ads_def = DynamicPartitionsDefinition(name="get_fp_ad_ids")
date_partition_def = HourlyPartitionsDefinition(start_date=datetime(2023, 7, 23), timezone="UTC")
@asset(name="asset_zee-test-partitions_multi_asset",
partitions_def=MultiPartitionsDefinition(
{
'ad': get_fp_ads_def,
'date': date_partition_def
}
),
auto_materialize_policy=AutoMaterializePolicy.eager()
)
Zeeshan Nehal
07/26/2023, 12:50 AMZeeshan Nehal
07/26/2023, 6:42 AMMultiPartitionsDefinition
due to DynamicPartitionsDefinition
in it, I want to set the max concurrent limit to 2 partition jobs concurrently?Zeeshan Nehal
07/26/2023, 6:44 AMDynamicpartitiondefinition
and `dailypartitionsdefinition`:
@sensor .... date_for_partition = date_partition_def.start.strftime('%Y-%m-%d')
<http://context.log.info|context.log.info>("new partition key: {}".format(f'xxyyxx|{date_for_partition}'))
return SensorResult(
# this support two partition keys else we need to use Multipartition tags,then the dynamic request will be an issue
run_requests=[RunRequest(partition_key=f'{ad_name}|{date_for_partition}') for ad_id in new_campaigns],
dynamic_partitions_requests=[get_fp_ads_def.build_add_request(new_campaigns),
get_fp_ads_def.build_delete_request(delete_campaigns)]
)