I have some confusion about the usage of `define_a...
# ask-community
u
I have some confusion about the usage of
define_asset_job
and
DynamicPartitionsDefinition
. In my understanding,
define_asset_job
is used to generate a job that materializes assets, while
DynamicPartitionsDefinition
allows for dynamically defining partitions. After reading the documentation, I only found use cases of
DynamicPartitionsDefinition
being updated in a sensor using
SensorResult(dynamic_partitions_requests=[])
. What should I do if I don’t want to use a sensor? For example, I want to define a job that, when executed, will first redefine the partitions in
DynamicPartitionsDefinition
and then materialize the partition asset using these partitions. Here is the use case I’ve seen:
Copy code
fruits = DynamicPartitionsDefinition(name="fruits")

my_job = define_asset_job(
    "my_job", selection=AssetSelection.all(), partitions_def=fruits
)

@sensor(job=my_job)
def my_sensor(context):
    partitions = fruits.get_partition_keys(dynamic_partitions_store=context.instance)
In my actual requirements, a MongoDB collection has 100 million records. I need to first read its count, then read the data in groups of 500,000, and save each group as a Parquet file as a partitioned asset. Is
DynamicPartitionsDefinition
the best practice here? What suggestions do you have?
🤖 1
g
Hey, Horatio you should be able to achieve this no problem. I’m not sure if there’s a better way using the
build_add_request
method, but this little example should achieve what you’re looking for:
Copy code
from dagster import Definitions, job, op, asset, DynamicPartitionsDefinition, OpExecutionContext

dynamic_partitions = DynamicPartitionsDefinition(name="dynamic_partition")


@asset
def get_mongodb_data():
    return [str(i) for i in range(10)]


@op
def add_partitions(context: OpExecutionContext, get_mongodb_data):
    new_partitions = [
        str(i) for i in get_mongodb_data
        if not context.instance.has_dynamic_partition(
            dynamic_partitions.name, str(i)
        )
    ]
    context.instance.add_dynamic_partitions(dynamic_partitions.name, new_partitions)


@job
def add_mongo_partitions():
    data = get_mongodb_data()
    add_partitions(data)


@asset(partitions_def=dynamic_partitions)
def foo():
    return 1


defs = Definitions(
    assets=[foo, get_mongodb_data],
    jobs=[add_mongo_partitions],
)
You obviously don’t need to cast it to
str
twice but you should hopefully get the idea 😅
❤️ 1
s
Thanks @Guy McCombe for the example. Horatio, the key idea is that you can use an op and
context.instance.add_dynamic_partitions
to do this, as Guy illustrated.
🎯 1
u
@Guy McCombe @sean Thank you for your idea, it has been very helpful to me. The
add_dynamic_partitions
and
delete_dynamic_partition
methods are very useful. Perhaps it would be a good idea to mention this API on the https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions page.
👍 1