Aman Gottumukkala
02/09/2024, 6:31 AMScout
02/09/2024, 6:31 AMrun_config
parameter when defining a schedule. The @schedule
decorator allows you to decorate a function that returns run config based on a provided ScheduleEvaluationContext
. This means you can dynamically generate different run configurations for each scheduled execution. However, the example you provided shows a schedule that triggers a job with a static run configuration, specifying a single batch ("W24") for the yc_batch_company_links
operation. If you want to call an asset materialization with an array of different configs all at the same time, you would need to create separate run requests for each configuration you want to execute. The documentation does not explicitly provide an example of triggering multiple run configurations for the same job simultaneously from within a single schedule. Typically, a schedule triggers a single job run with a specific configuration at each scheduled interval. If you need to materialize an asset with different configurations simultaneously, you might consider creating multiple schedules, each with its own configuration, or using a sensor that programmatically generates multiple RunRequest
objects based on certain conditions or events. For more complex scenarios where you need to trigger multiple runs with different configurations at the same time, you might need to look into using the Dagster API to script the execution of runs or explore other programmatic ways to achieve this behavior outside of the standard scheduling mechanism. If you have a specific use case or requirement that is not covered by the documentation, it may be helpful to reach out to the Dagster community or check the GitHub issues and discussions for similar use cases and potential solutions.Aman Gottumukkala
02/09/2024, 6:32 AMAman Gottumukkala
02/09/2024, 6:32 AMScout
02/09/2024, 6:32 AMpython from dagster import PartitionSetDefinition, schedule, job # Define a partition set with a function that generates run config for each partition def run_config_for_partition(partition): return { "ops": { "your_op_name": { "config": { "parameter": partition.value } } } } partition_set = PartitionSetDefinition( name="example_partition_set", pipeline_name="your_pipeline_name", run_config_for_partition=run_config_for_partition, # Define how to create partitions, e.g., by date or some other logic ) @schedule( cron_schedule="0 * * * *", job=your_job, partition_set=partition_set, ) def your_partition_schedule(context): partition = context.scheduled_execution_time return RunRequest(run_key=partition.strftime("%Y-%m-%d"), run_config=run_config_for_partition(partition))
In this example, your_partition_schedule
is a schedule that triggers runs based on the partitions defined in example_partition_set
. Each run will have a configuration generated by run_config_for_partition
, which is specific to the partition being processed. Please note that the actual implementation will depend on your specific use case, including how you define your partitions and how you want to trigger runs based on those partitions. The documentation provides more detailed guidance on how to work with partitions, partition sets, and schedules.Aman Gottumukkala
02/09/2024, 7:12 AMScout
02/09/2024, 7:12 AMpython from dagster import PartitionSetDefinition, Partition # Define a function that generates run config for each partition def run_config_for_batch_partition(partition): return { "ops": { "yc_batch_company_links": { "config": { "batches": [partition.value] } } } } # Create a list of batch partitions batch_partitions = ["W24", "W23", "W22", "W21"] # Add more batches as needed # Define the partition set batch_partition_set = PartitionSetDefinition( name="batch_partition_set", pipeline_name="yc_batch_orchestrator", run_config_for_partition=run_config_for_batch_partition, partitions_def=lambda: [Partition(batch) for batch in batch_partitions], )
2. Define the job that uses the partition set: python from dagster import job @job def yc_batch_orchestrator(): yc_batch_company_links()
3. Define a schedule that triggers runs for each partition: python from dagster import schedule @schedule( cron_schedule="0 * * * *", # Adjust the cron schedule as needed job=yc_batch_orchestrator, partition_set=batch_partition_set, ) def yc_batch_schedule(context): partition = context.get_partition() return RunRequest(run_key=partition.name, run_config=run_config_for_batch_partition(partition))
In this example, yc_batch_schedule
is a schedule that triggers runs for each partition defined in batch_partition_set
. The run_config_for_batch_partition
function generates the appropriate run configuration for each partition, which includes the specific batch to process. Please ensure that the pipeline_name
in the PartitionSetDefinition
matches the name of your job, and adjust the cron schedule to fit your desired execution frequency. Keep in mind that this is a simplified example, and you may need to adjust the code to fit the specifics of your Dagster setup, such as the actual job definition and the way you define your partitions. The documentation provides more detailed information on how to work with partitions, partition sets, and schedules.