https://dagster.io/ logo
#ask-ai
Title
# ask-ai
a

Aman Gottumukkala

02/09/2024, 6:31 AM
I want to call an asset materialization with an array of different configs all at the same time. is this possible? here's my existing orchestrator @job(tags={MAX_RUNTIME_SECONDS_TAG: 1200}) def yc_batch_orchestrator(): yc_batch_company_links() @schedule(job=yc_batch_orchestrator, cron_schedule="0 * * * *") def yc_batch_schedule(): return RunRequest( run_key=None, run_config={ "ops": {"yc_batch_company_links": {"config": {"batches": ["W24"]}}} }, )
s

Scout

02/09/2024, 6:31 AM
Based on the documentation, it is possible to execute a job with a specified run config via the
run_config
parameter when defining a schedule. The
@schedule
decorator allows you to decorate a function that returns run config based on a provided
ScheduleEvaluationContext
. This means you can dynamically generate different run configurations for each scheduled execution. However, the example you provided shows a schedule that triggers a job with a static run configuration, specifying a single batch ("W24") for the
yc_batch_company_links
operation. If you want to call an asset materialization with an array of different configs all at the same time, you would need to create separate run requests for each configuration you want to execute. The documentation does not explicitly provide an example of triggering multiple run configurations for the same job simultaneously from within a single schedule. Typically, a schedule triggers a single job run with a specific configuration at each scheduled interval. If you need to materialize an asset with different configurations simultaneously, you might consider creating multiple schedules, each with its own configuration, or using a sensor that programmatically generates multiple
RunRequest
objects based on certain conditions or events. For more complex scenarios where you need to trigger multiple runs with different configurations at the same time, you might need to look into using the Dagster API to script the execution of runs or explore other programmatic ways to achieve this behavior outside of the standard scheduling mechanism. If you have a specific use case or requirement that is not covered by the documentation, it may be helpful to reach out to the Dagster community or check the GitHub issues and discussions for similar use cases and potential solutions.
a

Aman Gottumukkala

02/09/2024, 6:32 AM
Can partitions be used here?
@Scout can partitions be used for this?
s

Scout

02/09/2024, 6:32 AM
Yes, partitions can be used to achieve the execution of a job with different configurations. Partitions in Dagster allow you to define slices of your data or computation that can be processed independently. They are particularly useful for organizing and executing computations on subsets of your data, such as processing data day-by-day or by some other logical division. When you define partitions for an asset or an operation (op), you can associate specific configurations with each partition. This allows you to run the same job with different configurations corresponding to each partition. For example, you could have a daily partitioned asset where each partition corresponds to a different day's data and has a configuration specific to that day. To use partitions for scheduling runs with different configurations, you would typically define a partition set and then create a schedule or a sensor that generates run requests for each partition. The schedule or sensor can use the partition key to determine the appropriate configuration for each run. Here's a simplified example of how you might define a partition set and a schedule that uses partitions:
Copy code
python from dagster import PartitionSetDefinition, schedule, job  # Define a partition set with a function that generates run config for each partition def run_config_for_partition(partition):     return {         "ops": {             "your_op_name": {                 "config": {                     "parameter": partition.value                 }             }         }     }  partition_set = PartitionSetDefinition(     name="example_partition_set",     pipeline_name="your_pipeline_name",     run_config_for_partition=run_config_for_partition,     # Define how to create partitions, e.g., by date or some other logic )  @schedule(     cron_schedule="0 * * * *",     job=your_job,     partition_set=partition_set, ) def your_partition_schedule(context):     partition = context.scheduled_execution_time     return RunRequest(run_key=partition.strftime("%Y-%m-%d"), run_config=run_config_for_partition(partition))
In this example,
your_partition_schedule
is a schedule that triggers runs based on the partitions defined in
example_partition_set
. Each run will have a configuration generated by
run_config_for_partition
, which is specific to the partition being processed. Please note that the actual implementation will depend on your specific use case, including how you define your partitions and how you want to trigger runs based on those partitions. The documentation provides more detailed guidance on how to work with partitions, partition sets, and schedules.
a

Aman Gottumukkala

02/09/2024, 7:12 AM
@Scout here is an example of a single partition config return RunRequest( run_key=None, run_config={ "ops": {"yc_batch_company_links": {"config": {"batches": ["W24"]}}} }, ) I want to run it on batches W23, W22, W21, etc. all in different runs. Generate me the code to define the partition, orchestrator, etc.
s

Scout

02/09/2024, 7:12 AM
To run different batches (e.g., W23, W22, W21, etc.) in different runs using partitions, you would first define a partition set that includes all the batches you want to process. Then, you would create a job that uses this partition set to generate the appropriate run configuration for each partition. Finally, you would define a schedule or a sensor that triggers runs for each partition. Here's an example of how you might set this up: 1. Define the partition set:
Copy code
python from dagster import PartitionSetDefinition, Partition  # Define a function that generates run config for each partition def run_config_for_batch_partition(partition):     return {         "ops": {             "yc_batch_company_links": {                 "config": {                     "batches": [partition.value]                 }             }         }     }  # Create a list of batch partitions batch_partitions = ["W24", "W23", "W22", "W21"]  # Add more batches as needed  # Define the partition set batch_partition_set = PartitionSetDefinition(     name="batch_partition_set",     pipeline_name="yc_batch_orchestrator",     run_config_for_partition=run_config_for_batch_partition,     partitions_def=lambda: [Partition(batch) for batch in batch_partitions], )
2. Define the job that uses the partition set:
Copy code
python from dagster import job  @job def yc_batch_orchestrator():     yc_batch_company_links()
3. Define a schedule that triggers runs for each partition:
Copy code
python from dagster import schedule  @schedule(     cron_schedule="0 * * * *",  # Adjust the cron schedule as needed     job=yc_batch_orchestrator,     partition_set=batch_partition_set, ) def yc_batch_schedule(context):     partition = context.get_partition()     return RunRequest(run_key=partition.name, run_config=run_config_for_batch_partition(partition))
In this example,
yc_batch_schedule
is a schedule that triggers runs for each partition defined in
batch_partition_set
. The
run_config_for_batch_partition
function generates the appropriate run configuration for each partition, which includes the specific batch to process. Please ensure that the
pipeline_name
in the
PartitionSetDefinition
matches the name of your job, and adjust the cron schedule to fit your desired execution frequency. Keep in mind that this is a simplified example, and you may need to adjust the code to fit the specifics of your Dagster setup, such as the actual job definition and the way you define your partitions. The documentation provides more detailed information on how to work with partitions, partition sets, and schedules.
4 Views