Tristan Schrader
07/26/2023, 4:38 PMPartitionMapping
I can use to get all of the partitions from a specific SensorResult
? I am using dynamic partitions.sandy
07/26/2023, 10:37 PMTristan Schrader
07/27/2023, 5:05 PMmy_partition = DynamicPartitionsDefinition()
@asset(partitions_def=my_partition)
def my_asset(config):
return clean_data(config)
@sensor(asset_selection=AssetSelection.keys(my_asset.key))
def my_sensor(context):
cursor = context.cursor or None
new_files = get_new_files(cursor)
return SensorResult(
run_requests=[RunRequest(partition_key=file.id, run_config=get_config(file)) for file in new_files],
cursor=new_cursor(),
dynamic_partitions_requests=[my_partition.build_add_request([file.id for file in new_files])
)
# I don't really know what to do here to combine stuff ...
@asset
def load_into_database(my_asset):
...
I have a sensor that creates a RunRequest
for each new file in a remote folder, treating each file as a partition in a DynamicPartitionsDefinition
. Each of these files will be used to populate a normalized data model in a database. The problem I’m facing is that because the data model has foreign keys everywhere, a file could refer to the same underlying data that another file already created and I don’t want to duplicate anything. What happens when two new files are running in parallel?
I think I’m doing something fundamentally wrong here. I understood assets to help define our data declaratively, but I can’t treat any partition independently. My idea in asking the question was to prevent this concurrency issue of duplication when you have interdependent file partitions running in parallel. The thought was that a PartitionMapping
could be used in this load_into_database
asset to grab all of the new files a run was created for by a single sensor call.Tristan Schrader
07/27/2023, 5:32 PMSpecificPartitionsPartitionMapping
which I could use to query a pattern in the partition keys I guess.Tristan Schrader
07/27/2023, 5:36 PMTristan Schrader
07/27/2023, 6:09 PM