https://dagster.io/ logo
#ask-ai
Title
# ask-ai
b

BC A

03/13/2024, 1:31 PM
@Scout can you have a dynamic out as outs in @graph_multi_asset ?
s

Scout

03/13/2024, 1:31 PM
Based on the documentation and the issues from the Github repository, it is currently not supported to have dynamic outputs (
DynamicOut
) as part of the outputs (
outs
) in a
@graph_multi_asset
. There is an open issue that specifically mentions the error when any outputs of a
@graph_asset
are dynamic, indicating that this feature is not yet supported. The failure related to this issue may manifest in different ways, but the core limitation is that dynamic outputs are not compatible with
@graph_asset
or
@graph_multi_asset
at this time. For reference, here is the relevant excerpt from the Github Issues: > ISSUE TITLE: error if any outputs of a
@graph_asset
are dynamic > BODY: This is not yet supported, but the failure shows up in other ways. This indicates that the use of dynamic outputs in this context is a known limitation and is not currently a feature offered by Dagster.
b

BC A

03/13/2024, 1:38 PM
@Scout what about dynamic partitioning ?
s

Scout

03/13/2024, 1:38 PM
Dynamic partitioning in Dagster allows you to define assets where the set of partitions can change over time. This is particularly useful when dealing with data sources that grow or change frequently, such as logs or event streams, where new partitions need to be processed as they become available. Dynamic partitioning is supported through the
DynamicPartitionsDefinition
class, which you can use to define how partitions are discovered for an asset. Here's an example from the discussions on how to create dynamic partitions within an asset:
Copy code
python dynamic_partitions_def = DynamicPartitionsDefinition(name="my_partitions_def")  @op def add_dynamic_partition_op(context):     context.instance.add_dynamic_partitions(dynamic_partitions_def.name, [partition_key])     ...
In the context of assets, you can use dynamic partitioning as follows:
Copy code
python from dagster import asset, DynamicPartitionsDefinition  dynamic_partitions_def = DynamicPartitionsDefinition(name="my_dynamic_partition")  @asset(partitions_def=dynamic_partitions_def) def my_dynamic_asset(context):     partition_key = context.partition_key     # Logic to handle the data for the given partition_key     ...
When using dynamic partitions, you can add and remove partitions dynamically via code or through the UI. This allows for a flexible and adaptable approach to partitioning that can accommodate changes in the underlying data. It's important to note that dynamic partitioning is distinct from dynamic outputs (
DynamicOut
). Dynamic partitioning refers to the ability to handle a changing set of partitions for an asset, while dynamic outputs would imply that the number or type of outputs from an operation or asset could change dynamically, which is not currently supported in Dagster as per the previous discussion.
b

BC A

03/13/2024, 2:25 PM
@Scout can we do multipartitions defined asset and within the partitions_def use dynamicpartitiondefinition ?
s

Scout

03/13/2024, 2:25 PM
Yes, you can use a
DynamicPartitionsDefinition
as one of the dimensions in a
MultiPartitionsDefinition
for an asset. This allows you to define assets that have multiple partition dimensions, where at least one of the dimensions is dynamic. Here's an example from the discussions that illustrates how to use
MultiPartitionsDefinition
and
DynamicPartitionsDefinition
together:
Copy code
python from dagster import asset, MultiPartitionsDefinition, DynamicPartitionsDefinition, DailyPartitionsDefinition  dynamic_colors = DynamicPartitionsDefinition(name="dynamic_colors")  partitions_def = MultiPartitionsDefinition(     {         "time": DailyPartitionsDefinition(start_date="2022-01-01"),         "color": dynamic_colors,     } )  @asset(partitions_def=partitions_def) def multi_dynamic_asset(context):     partition = context.partition_key.keys_by_dimension     <http://context.log.info|context.log.info>(f"Partition: time: {partition['time']}, color: {partition['color']}")
In this example, the
multi_dynamic_asset
asset is defined with a
MultiPartitionsDefinition
that includes a daily time partition and a dynamic color partition. When you run this asset in the UI, you can add colors to the dynamic partition and successfully materialize the asset. This approach allows you to create complex partitioning schemes where some dimensions are fixed (e.g., daily partitions) and others are dynamic and can change over time. It provides flexibility in how you partition your assets, enabling you to handle a variety of partitioning scenarios.
b

BC A

03/13/2024, 6:27 PM
@Scout I have op_generate that generates dynamic partitions (how to add it to my previously defined dynamic partitions definition ? ) how to trigger op_filter on every dynamic partition op_generate have created write an example of code
s

Scout

03/13/2024, 6:27 PM
In Dagster, you can use the
DynamicPartitionsDefinition
to define dynamic partitions for an asset, and you can programmatically add new partitions to this definition using an operation (op). To trigger another operation on every dynamic partition created by
op_generate
, you would typically use a sensor or a job that is aware of the new partitions. Here's an example of how you might set this up:
Copy code
python
from dagster import op, job, DynamicPartitionsDefinition, repository, sensor, RunRequest

# Define your dynamic partitions
dynamic_partitions_def = DynamicPartitionsDefinition(name="my_dynamic_partitions")

# Define an op that generates new dynamic partitions
@op
def op_generate(context):
    # Logic to determine new partitions
    new_partitions = ["partition1", "partition2"]  # Replace with your logic
    # Add new partitions to the dynamic partitions definition
    context.instance.add_dynamic_partitions(dynamic_partitions_def.name, new_partitions)
    return new_partitions

# Define an op that should be triggered for each dynamic partition
@op
def op_filter(context, partition_key):
    <http://context.log.info|context.log.info>(f"Processing partition: {partition_key}")
    # Your filtering logic here

# Define a job that includes both ops
@job
def my_job():
    new_partitions = op_generate()
    op_filter.alias("filter_partition1")(partition_key=new_partitions[0])
    op_filter.alias("filter_partition2")(partition_key=new_partitions[1])
    # You would need to create more aliases if you have more partitions

# Define a sensor that triggers the job for new dynamic partitions
@sensor(job=my_job)
def my_sensor(context):
    # Logic to check for new partitions
    # This could involve checking an external system or a time-based check
    new_partition_keys = ["partition1", "partition2"]  # Replace with your logic

    for partition_key in new_partition_keys:
        yield RunRequest(
            run_key=partition_key,
            run_config={
                "ops": {
                    "filter_partition1": {"config": {"partition_key": partition_key}},
                    "filter_partition2": {"config": {"partition_key": partition_key}},
                    # Add more op configurations if you have more partitions
                }
            },
        )

# Define a repository to hold the job and sensor
@repository
def my_repository():
    return [my_job, my_sensor]
In this example,
op_generate
is an operation that generates new dynamic partitions and adds them to the
DynamicPartitionsDefinition
. The
op_filter
operation is designed to process data for a specific partition. The
my_job
job includes both operations, and you would need to create an alias for
op_filter
for each new partition. The
my_sensor
sensor is responsible for detecting new partitions
and triggering
my_job
with the appropriate configuration for each partition. The sensor yields a
RunRequest
for each new partition, which includes a run configuration that specifies the partition key for each aliased
op_filter
. Please note that this example is a simplified illustration. In practice, you would need to implement the logic for detecting new partitions and ensure that the job configuration is correctly set up for the number of dynamic partitions you have. Additionally, the aliasing of
op_filter
is manual in this example; you would typically use a more dynamic approach to handle an arbitrary number of partitions.
b

BC A

03/13/2024, 6:35 PM
@Scout what if my op_generate was looking more like this ; @op def op_generate(context): # Logic to determine new partitions new_partitions = ["partition1", "partition2"] # Replace with your logic # Add new partitions to the dynamic partitions definition for partition in new_partitions: context.instance.add_dynamic_partitions(dynamic_partitions_def.name, new_partitions) yield new_partitions is there a way to trigger a run request for every yield ?
s

Scout

03/13/2024, 6:35 PM
In Dagster, if you have an operation (
op
) that yields new dynamic partitions and you want to trigger a run request for each yield, you would typically use a sensor to monitor for these new partitions and then trigger the appropriate job runs. However, as of my knowledge cutoff in early 2023, there isn't a built-in mechanism to directly trigger a run request for each yield within an
op
. Instead, you would need to use a sensor to periodically check for new partitions and then create run requests based on the new partitions found. Here's an example of how you might set up a sensor to trigger a run request for each new partition:
Copy code
python
from dagster import op, job, sensor, RunRequest, repository, DynamicPartitionsDefinition

dynamic_partitions_def = DynamicPartitionsDefinition(name="my_dynamic_partitions")

@op
def op_generate(context):
    new_partitions = ["partition1", "partition2"]  # Replace with your logic
    for partition in new_partitions:
        context.instance.add_dynamic_partitions(dynamic_partitions_def.name, [partition])
        yield partition

@op
def op_process_partition(context, partition_key):
    <http://context.log.info|context.log.info>(f"Processing partition: {partition_key}")
    # Your processing logic here

@job
def process_partition_job():
    partition_key = op_generate()
    op_process_partition(partition_key)

@sensor(job=process_partition_job)
def new_partition_sensor(context):
    # Logic to check for new partitions
    # This could involve checking an external system or a time-based check
    new_partition_keys = get_new_partitions()  # Replace with your logic to get new partitions

    for partition_key in new_partition_keys:
        yield RunRequest(
            run_key=partition_key,
            run_config={
                "ops": {
                    "op_process_partition": {"config": {"partition_key": partition_key}}
                }
            },
        )

def get_new_partitions():
    # Implement logic to retrieve new partitions
    # This could be from a database, a file, an API, etc.
    return []

@repository
def my_repository():
    return [process_partition_job, new_partition_sensor]
In this example,
op_generate
yields new partitions, and
op_process_partition
processes each partition. The
new_partition_sensor
sensor checks for new partitions by calling
get_new_partitions
, which should contain the logic to identify new partitions that need processing. For each new partition, the sensor yields a
RunRequest
with a unique
run_key
and the appropriate run configuration. Please note that the
get_new_partitions
function is a placeholder and should be implemented to suit your specific use case. It should return the list of new partition keys that have not yet been processed. Keep in mind that the sensor will
need to be able to determine which partitions are new and need processing. This might involve tracking which partitions have already been processed or using timestamps or other indicators to identify new data.
4 Views