Chris Nogradi
05/12/2023, 10:02 PMChris Nogradi
05/22/2023, 1:22 PMDrew You
05/22/2023, 1:45 PMDrew You
05/22/2023, 1:45 PM@asset
def groups(context: OpExecutionContext) -> SQL:
# get the groups
current_groups = set([group["name"] for group in groups])
partition_groups = set(
context.instance.get_dynamic_partitions(groups_partition.name)
)
added_groups = current_groups - partition_groups
removed_groups = partition_groups - current_groups
<http://context.log.info|context.log.info>(
f"partition {groups_partition.name}: adding {added_groups} and removing {removed_groups}"
)
context.instance.add_dynamic_partitions(groups_partition.name, list(added_groups))
for group in removed_groups:
context.instance.delete_dynamic_partition(groups_partition.name, group)
return SQL("select * from $df", df=df)
Drew You
05/22/2023, 1:46 PM@asset(partitions_def=groups_partition)
def asset_for_group(context: OpExecutionContext) -> SQL:
Chris Nogradi
05/23/2023, 1:56 AMMycchaka Kleinbort
05/23/2023, 12:54 PMdagster._core.errors.DagsterDefinitionChangedDeserializationError: Asset {{assetname}} had a PartitionsDefinition at storage-time, but no longer does
I'm pretty sure I've not changed anything... any ideas? I'm on dagster 1.3.5Chris Nogradi
05/25/2023, 1:03 PM