Brian Pohl
06/02/2023, 10:34 PMTim Castillo
06/03/2023, 12:11 AMcustomer_a
and customer_b
and you're using the launch config to dictate that?
If so, have you thought about using dynamic partitions? With the partition being the destination? That'd be cleanest way for Dagster to keep track of if you've already produced an asset with that config.
You'll also still have some wiggle room and be able to make multi-partitioned if you need to.Brian Pohl
06/03/2023, 1:30 AMBrian Pohl
06/05/2023, 9:21 PMBrian Pohl
06/07/2023, 6:50 PMTim Castillo
06/07/2023, 6:59 PMBrian Pohl
06/07/2023, 7:01 PMBrian Pohl
06/07/2023, 7:04 PMclaire
06/07/2023, 9:26 PM@asset
def my_asset(context):
context.instance.add_dynamic_partitions(partitions_def_name, list_of_partition_keys)
Brian Pohl
06/07/2023, 9:28 PMjosh gruenberg
06/07/2023, 9:30 PMBrian Pohl
06/07/2023, 9:31 PMTim Castillo
06/07/2023, 9:34 PMBrian Pohl
06/07/2023, 9:41 PMfoo_def = DynamicPartitionsDefinition(name="bar")
@asset
def my_asset(context):
...
foo_job = define_asset_job("foo job", AssetSelection.keys("bar"), partitions_def=foo_def)
defs = Definitions(
assets=[my_asset],
jobs=[foo_job],
resources={
"a_resource": some_resource,
}
)
given this, where do you put the command context.instance.add_dynamic_partitions(partitions_def_name, list_of_partition_keys)
? does that have to be a separate op?claire
06/07/2023, 10:11 PMclaire
06/07/2023, 10:11 PMdynamic_partitions_def = DynamicPartitionsDefinition(name="my_partitions_def")
@asset
def add_dynamic_partition_asset(context, my_config):
partition_key = get_unique_key_from_config(my_config)
context.instance.add_dynamic_partitions(dynamic_partitions_def.name, [partition_key])
return Output(metadata={"partition_key": partition_key})
@asset_sensor(asset_key=add_dynamic_partition_asset.key, job=job_containing_assets)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
yield RunRequest(partition_key=asset_event.asset_materialization.metadata["partition_key"])
@asset(partitions_def=dynamic_partitions_def)
def one(context):
if context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=context.asset_key_for_output(),
asset_partitions=[context.partition_key],
),
limit=1,
):
pass
# materialization has occurred for the partition
else:
# materialization has not occurred for the given partition yet
write_output_to_s3()
return path
@asset(partitions_def=dynamic_partitions_def)
def two(one):
...
claire
06/07/2023, 10:16 PMBrian Pohl
06/07/2023, 10:38 PM