王昊
05/11/2023, 11:42 AMdefine_asset_job
and DynamicPartitionsDefinition
. In my understanding, define_asset_job
is used to generate a job that materializes assets, while DynamicPartitionsDefinition
allows for dynamically defining partitions. After reading the documentation, I only found use cases of DynamicPartitionsDefinition
being updated in a sensor using SensorResult(dynamic_partitions_requests=[])
. What should I do if I don’t want to use a sensor? For example, I want to define a job that, when executed, will first redefine the partitions in DynamicPartitionsDefinition
and then materialize the partition asset using these partitions.
Here is the use case I’ve seen:
fruits = DynamicPartitionsDefinition(name="fruits")
my_job = define_asset_job(
"my_job", selection=AssetSelection.all(), partitions_def=fruits
)
@sensor(job=my_job)
def my_sensor(context):
partitions = fruits.get_partition_keys(dynamic_partitions_store=context.instance)
In my actual requirements, a MongoDB collection has 100 million records. I need to first read its count, then read the data in groups of 500,000, and save each group as a Parquet file as a partitioned asset. Is DynamicPartitionsDefinition
the best practice here? What suggestions do you have?Guy McCombe
05/11/2023, 12:00 PMbuild_add_request
method, but this little example should achieve what you’re looking for:
from dagster import Definitions, job, op, asset, DynamicPartitionsDefinition, OpExecutionContext
dynamic_partitions = DynamicPartitionsDefinition(name="dynamic_partition")
@asset
def get_mongodb_data():
return [str(i) for i in range(10)]
@op
def add_partitions(context: OpExecutionContext, get_mongodb_data):
new_partitions = [
str(i) for i in get_mongodb_data
if not context.instance.has_dynamic_partition(
dynamic_partitions.name, str(i)
)
]
context.instance.add_dynamic_partitions(dynamic_partitions.name, new_partitions)
@job
def add_mongo_partitions():
data = get_mongodb_data()
add_partitions(data)
@asset(partitions_def=dynamic_partitions)
def foo():
return 1
defs = Definitions(
assets=[foo, get_mongodb_data],
jobs=[add_mongo_partitions],
)
Guy McCombe
05/11/2023, 12:01 PMstr
twice but you should hopefully get the idea 😅sean
05/11/2023, 1:00 PMcontext.instance.add_dynamic_partitions
to do this, as Guy illustrated.王昊
05/12/2023, 1:54 AMadd_dynamic_partitions
and delete_dynamic_partition
methods are very useful. Perhaps it would be a good idea to mention this API on the https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions page.