https://dagster.io/ logo
#ask-ai
Title
# ask-ai
t

Terry Lines

02/14/2024, 5:23 PM
How would I used partioned config when some configuration parameters are supplied at run time
s

Scout

02/14/2024, 5:23 PM
When using partitioned configuration in Dagster, you typically define a discrete set of partitions along with a function for generating config for each partition. However, if you need to supply some configuration parameters at runtime, you can still do so. The partitioned configuration will define the base configuration for each partition, and you can override or supplement this configuration at runtime when you launch a job. Here's a step-by-step approach based on the documentation: 1. Define your partitioned configuration using either
PartitionedConfig
or
dynamic_partitioned_config
. This will involve creating a function that generates the configuration for each partition. 2. When you launch the job, you can provide additional runtime configuration. This can be done through the Dagster UI Launchpad or programmatically using the
execute_in_process
method or similar methods. 3. The runtime configuration you provide will be merged with the partitioned configuration. If there are any overlapping configuration values, the runtime configuration will typically take precedence and override the partitioned configuration. 4. Ensure that your job is set up to accept runtime configuration. This might involve setting up config mappings or making sure that your ops are designed to handle the configuration appropriately. Here is an example from the documentation that illustrates how you might define a job with a hardcoded configuration, which can be overridden at runtime:
Copy code
python from dagster import Config, OpExecutionContext, RunConfig, job, op  class DoSomethingConfig(Config):     config_param: str  @op def do_something(context: OpExecutionContext, config: DoSomethingConfig):     <http://context.log.info|context.log.info>("config_param: " + config.config_param)  default_config = RunConfig(     ops={"do_something": DoSomethingConfig(config_param="stuff")} )  @job(config=default_config) def do_it_all_with_default_config():     do_something()  if __name__ == "__main__":     # Will log "config_param: stuff"     do_it_all_with_default_config.execute_in_process()
In this example, the
do_it_all_with_default_config
job has a hardcoded configuration that can be overridden when executing the job. For partitioned jobs, you would use a similar approach but with a
PartitionedConfig
instead of a
RunConfig
. Remember that the exact behavior of how runtime configuration interacts with partitioned configuration can depend on the specifics of your Dagster setup and the version you are using. Always refer to the latest documentation for the most accurate and detailed guidance.
t

Terry Lines

02/14/2024, 5:31 PM
@Scout can you give an example of a sensor with a RunRequest that uses a PartionedConfig
s

Scout

02/14/2024, 5:31 PM
Certainly! Below is an example of a sensor that uses a
RunRequest
to trigger a run for a specific partition of a partitioned job. The sensor will generate a
RunRequest
with the appropriate partition key, which will be used to select the partitioned config for the run.
Copy code
python from dagster import sensor, RunRequest, job, PartitionedConfig, StaticPartitionsDefinition  # Define a partitioned config def config_for_partition(partition_key: str):     return {         "solids": {             "my_solid": {                 "config": {                     "date": partition_key                 }             }         }     }  partitions_def = StaticPartitionsDefinition(     ["2021-01-01", "2021-01-02", "2021-01-03"] )  partitioned_config = PartitionedConfig(     run_config_for_partition_fn=config_for_partition,     partitions_def=partitions_def, )  # Define a job with the partitioned config @job(config=partitioned_config) def my_partitioned_job():     # define your ops and resources here     pass  # Define a sensor that triggers a run for a specific partition @sensor(job=my_partitioned_job) def my_sensor(context):     partition_key = "2021-01-01"  # Determine the partition key dynamically as needed     run_config = config_for_partition(partition_key)  # Get the run config for the partition     return RunRequest(         run_key=partition_key,  # Unique identifier for the run         run_config=run_config,  # Run config derived from the partitioned config     )
In this example,
my_partitioned_job
is a job that has a
PartitionedConfig
. The
config_for_partition
function generates the run configuration for a given partition key. The
my_sensor
sensor function creates a
RunRequest
for a specific partition key, which is hardcoded in this example but could be determined dynamically based on some external condition or event. When the sensor triggers, it will launch a run of
my_partitioned_job
using the configuration specific to the partition key provided in the
RunRequest
. This allows you to have a sensor that can trigger partition-specific runs of a job with the appropriate configuration for each partition.
t

Terry Lines

02/14/2024, 5:48 PM
@Scout can you give me an example of an asset with a partioned config?
s

Scout

02/14/2024, 5:48 PM
Certainly! Below is an example of how to define an asset with a partitioned config in Dagster. This example assumes that you have an asset that represents some data that is partitioned by date, and you want to configure it differently based on the partition.
Copy code
python
from dagster import asset, DailyPartitionsDefinition, PartitionedConfig, build_assets_job

# Define a function that generates config for each partition
def config_for_partition(partition_key: str):
    # Here you can customize the config based on the partition key
    # For example, you might want to read data from a different file or database table
    # based on the date represented by the partition key
    return {
        "solids": {
            "my_asset_solid": {
                "config": {
                    "file_path": f"data_for_{partition_key}.csv"
                }
            }
        }
    }

# Define the partitions for the asset
partitions_def = DailyPartitionsDefinition(start_date="2021-01-01")

# Define the partitioned config
partitioned_config = PartitionedConfig(
    run_config_for_partition_fn=config_for_partition,
    partitions_def=partitions_def,
)

# Define the asset with the partitioned config
@asset(partitions_def=partitions_def)
def my_partitioned_asset(context):
    # The context provides access to the partition key and config
    partition_key = context.partition_key
    file_path = context.op_config["file_path"]
    # Here you would include the logic to process the data for the given partition
    # For example, reading from the file_path and performing some transformations
    return f"Processed data for partition {partition_key} from {file_path}"

# Build a job for the asset using the partitioned config
my_partitioned_asset_job = build_assets_job(
    "my_partitioned_asset_job",
    assets=[my_partitioned_asset],
    config=partitioned_config,
)

# Now you can run the job for a specific partition
# For example, to run the job for the partition "2021-01-01", you would use:
run_config = config_for_partition("2021-01-01")
result = my_partitioned_asset_job.execute_in_process(run_config=run_config)
In this example,
my_partitioned_asset
is an asset that is partitioned daily starting from January 1, 2021. The
config_for_partition
function generates the run configuration for each partition, which in this case specifies a different file path for each date. The
my_partitioned_asset_job
is a job built for the asset that uses the
PartitionedConfig
to determine the configuration for each partition. When you execute the job for a specific partition, you provide the run configuration generated by
config_for_partition
for that partition. This allows the asset computation to be customized based on the partition key, which could represent different slice
s of data, such as different dates or ranges.
t

Terry Lines

02/14/2024, 6:34 PM
@Scout can you show me an exmaple which uses the new style config syntax i..e
Copy code
@asset(partitions_def=partitions_def)
def my_partitioned_asset(config:MyConfig):
s

Scout

02/14/2024, 6:34 PM
As of my knowledge cutoff in early 2023, the new style of config syntax for assets using a Python class to define the config schema was not yet available in Dagster. The documentation provided examples of assets using the
config_schema
parameter to define configuration directly within the
@asset
decorator, rather than through a separate Python class. However, the documentation does mention that the asset layer sits atop the op layer, and the computation of an asset is handled by a wrapped op. This means that the asset's compute function is simply its corresponding op's compute function. Consequently, asset config is implemented by forwarding an asset's
config_schema
to its underlying op. If the new style of config syntax using a Python class has been introduced after my knowledge cutoff, I would not have information on that feature. You would need to refer to the latest Dagster documentation or release notes for examples and guidance on using this new feature. For the most accurate and up-to-date information, please check the latest Dagster documentation or the Dagster GitHub repository. If this feature has been released, the documentation will provide examples and instructions on how to use the new config syntax with assets. How to specify config for graph-backed a... button Support mapping partition key in op/asse... button Dagster 0.15.0: Cool for the Summer | Da... button How to define default values for config ... button Thank you for your feedback!
3 Views