https://dagster.io/ logo
#ask-ai
Title
# ask-ai
t

Tara Larrue

02/15/2024, 2:50 PM
@Scout how do I generate a default config based on the partitions chosen for a backfill job?
s

Scout

02/15/2024, 2:50 PM
To generate a default config based on the partitions chosen for a backfill job, you can use the
PartitionedConfig
class provided by Dagster. This class allows you to define a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job. Here's a step-by-step guide based on the documentation: 1. Define a
PartitionedConfig
for your job. This involves specifying a partitions definition and a function that maps partition keys to run configuration. 2. Implement a function that takes a partition key (and optionally the current time or other relevant data) and returns the run configuration for that partition. This function is typically passed as the
run_config_for_partition_fn
parameter when creating a
PartitionedConfig
. 3. Set the
PartitionedConfig
as the config for your job. This allows you to launch backfills for that job and view the run history across partitions. 4. When you initiate a backfill, Dagster will use the
PartitionedConfig
to determine the run configuration for each partition that is included in the backfill. Here is an example of how you might define a `PartitionedConfig`:
Copy code
python from dagster import PartitionedConfig, daily_partitioned_config  @daily_partitioned_config(start_date="2021-01-01") def my_partitioned_config(start, end):     # Generate the run configuration for the partition defined by start and end dates     return {         "solids": {             "my_solid": {                 "config": {                     "date": start.strftime("%Y-%m-%d")                 }             }         }     }  # Use the PartitionedConfig when defining your job my_job = my_job.configured(PartitionedConfig(partitions_def=my_partitions_def, run_config_for_partition_fn=my_partitioned_config))
In this example,
my_partitioned_config
is a function that generates the run configuration for each partition based on the start date of the partition. When you launch a backfill, Dagster will call this function for each partition you've selected to include in the backfill and use the returned configuration for the runs. Please note that the actual implementation may vary based on your specific use case and the structure of your Dagster job. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance.
t

Tara Larrue

02/15/2024, 4:06 PM
@Scout can you provide an example using
dynamic_partitioned_config
for an asset that has a multipartition comprised of 2 dynamic dimensions?
s

Scout

02/15/2024, 4:06 PM
As of my knowledge cutoff date in early 2023, Dagster's documentation does not provide a direct example of using
dynamic_partitioned_config
for an asset with a multipartition comprised of two dynamic dimensions. The
dynamic_partitioned_config
function is used to generate run configurations for dynamically partitioned assets, but the documentation does not explicitly cover the case of multipartitions with multiple dynamic dimensions. However, I can guide you on how you might approach this based on the concepts provided by Dagster for dynamic partitions and multipartitions: 1. Define your dynamic partitions using
DynamicPartitionsDefinition
for each dimension. 2. Combine these dynamic partitions into a
MultiPartitionsDefinition
. 3. Create a function that generates the run configuration based on the partition keys for each dimension. 4. Use this function to provide the run configuration when launching a backfill or materializing the asset. Here's a conceptual example that outlines the steps you might take:
Copy code
python
from dagster import DynamicPartitionsDefinition, MultiPartitionsDefinition, asset

# Define dynamic partitions for each dimension
dynamic_dimension1 = DynamicPartitionsDefinition(name="dynamic_dimension1")
dynamic_dimension2 = DynamicPartitionsDefinition(name="dynamic_dimension2")

# Combine dynamic partitions into a MultiPartitionsDefinition
multi_partitions_def = MultiPartitionsDefinition({
    "dimension1": dynamic_dimension1,
    "dimension2": dynamic_dimension2,
})

# Define your asset with the multi-dimension partitions
@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
    # Access partition keys for each dimension
    partition_keys = context.partition_key.keys_by_dimension
    dimension1_key = partition_keys["dimension1"]
    dimension2_key = partition_keys["dimension2"]
    
    # Use the partition keys to perform asset computation
    # ...

# Define a function to generate run configuration based on partition keys
def generate_run_config(partition_key):
    # Extract keys for each dimension from the partition key
    keys_by_dimension = partition_key.keys_by_dimension()
    dimension1_key = keys_by_dimension["dimension1"]
    dimension2_key = keys_by_dimension["dimension2"]
    
    # Generate and return the run configuration for these partition keys
    return {
        "solids": {
            "my_multi_dimension_asset": {
                "config": {
                    "dimension1": dimension1_key,
                    "dimension2": dimension2_key,
                }
            }
        }
    }

# Use the generate_run_config function when launching a backfill or materializing the asset
Please note that this is a conceptual example and may not directly work without adjustments spec
ific to your Dagster setup. The actual implementation details, such as how you define dynamic partitions and how you generate run configurations, will depend on your specific use case and the structure of your data and assets. For the most accurate and up-to-date information, please refer to the latest Dagster documentation or reach out to the Dagster community for guidance on handling multipartitions with multiple dynamic dimensions.
t

Tara Larrue

02/15/2024, 4:09 PM
@Scout is there a built in function to read yaml configs into a object?
s

Scout

02/15/2024, 4:10 PM
Yes, Dagster provides a built-in utility function to read YAML configurations into a dictionary object. The function
dagster.config_from_yaml_strings
can be used to construct a run config dictionary from a list of YAML strings. Here's an example of how you might use it:
Copy code
python from dagster import config_from_yaml_strings  yaml_strings = [     """     solids:       my_solid:         config:           key: value     """ ]  run_config = config_from_yaml_strings(yaml_strings)
After calling
config_from_yaml_strings
,
run_config
will be a dictionary constructed from the provided YAML strings, which you can then use as the configuration for a Dagster job or asset. Please note that if the YAML document is invalid and has a parse error, a
DagsterInvariantViolationError
will be raised. For more details and the latest updates, please refer to the Dagster documentation.
t

Tara Larrue

02/15/2024, 4:34 PM
@Scout how do I edit
run_config
from this object?
s

Scout

02/15/2024, 4:34 PM
The
run_config
generated from a YAML string using
config_from_yaml_strings
is a Python dictionary. You can edit this dictionary just like any other Python dictionary. Here's an example of how you might modify the `run_config`:
Copy code
python from dagster import config_from_yaml_strings  yaml_strings = [     """     solids:       my_solid:         config:           key: value     """ ]  # Generate the initial run_config from YAML strings run_config = config_from_yaml_strings(yaml_strings)  # Now, run_config is a Python dictionary that can be edited # For example, to change the value of 'key' under 'my_solid' config: run_config['solids']['my_solid']['config']['key'] = 'new_value'  # To add a new key-value pair run_config['solids']['my_solid']['config']['new_key'] = 'new_value'  # To remove a key del run_config['solids']['my_solid']['config']['key_to_remove']  # To add a new solid config run_config['solids']['another_solid'] = {'config': {'another_key': 'another_value'}}  # After editing, you can use run_config to execute a job or asset
After making the desired changes to the
run_config
dictionary, you can pass it to a job execution function such as
execute_in_process
or use it when launching a run through the Dagster UI or CLI. Remember that the structure of the
run_config
dictionary should match the schema expected by your job or asset. If you provide an invalid configuration, Dagster will raise a
DagsterInvalidConfigError
when you try to execute the job or asset. Always ensure that your edits conform to the expected schema defined by your job's or asset's configuration.
t

Tara Larrue

02/15/2024, 4:50 PM
@Scout What is wrong with my setup? I am trying to define a default config for my multi-partitioned asset for backfill jobs using dagster 1.6.3.
Copy code
class ScrapeConfig(Config):
    abbrev: str
    datatype: str
    node: Optional[str] = None
    meta: Optional[str] = None
    need_type: Optional[str] = "data"
    lookback_hrs: Optional[int] = 6
    backfill: Optional[bool] = False
    start: Optional[str] = None
    end: Optional[str] = None

DAGSTER_SCRAPER_CONFIG_LOC = "./scraper_configs"


@dynamic_partitioned_config
def backfill_scraper_config(partition_key):
    """Generate a ScrapeConfig object for a `input_data_sources` partitioned backfill job."""

    # find partition dimension keys
    keys_by_dimension = partition_key.keys_by_dimension()
    ba_abbrev = keys_by_dimension["ba_abbrev"]
    datatype = keys_by_dimension["datatype"]

    # read config from config file
    config_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / f"{ba_abbrev}~{datatype}.yaml"
    config_yaml_str = config_path.read_text(encoding="utf-8")
    config = config_from_yaml_strings(config_yaml_str)

    # edit config for backfill job - defaults to 1 year backfill
    now = datetime.now()
    config["ops"]["input_data_sources"]["backfill"] = True
    config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).date()
    config["ops"]["inout_data_sources"]["end"] = now.date()

    return config

ba_abbrev_partitions_def = DynamicPartitionsDefinition(name="ba_abbrev")
datatype_partitions_def = DynamicPartitionsDefinition(name="datatype")

ba_dt_partitions_def = MultiPartitionsDefinition(
    {
        "ba_abbrev": ba_abbrev_partitions_def,
        "datatype": datatype_partitions_def,
    }
)

@asset(partitions_def=ba_dt_partitions_def, output_required=False)
def input_data_sources(context, config: ScrapeConfig) -> pd.DataFrame:
    """Raw scraped input data for all WT products, partitioned by BA + datatype.
    Store as file."""

    # Get specified watttime-scraper module
    scraper_module = find_scraper_module(config)

    # Convert config to watttime DataNeeds object
    data_processor = NEED_MAP[config.need_type](
        abbrev=config.abbrev, datatype=config.datatype, node=config.node, meta=config.meta)

    # Get scraper call_info series from needs object
    call_info = pd.Series(data_processor.unique_scraper_data, dtype=object)
    call_info["func"] = scraper_module.DATATYPE_TO_METHOD[config.datatype]

    # Get job time range, specified by config
    start, end = get_scrape_job_time_range(config)

    # Scrape data
    try:
        <http://context.log.info|context.log.info>(f"Scraping {call_info.values}, {start}, {end}...")
        df = call_info.func(
            start,
            end,
            call_info.get("abbrev", None),
            call_info.get("node", None),
        )
        <http://context.log.info|context.log.info>(f"{len(df)} total records scraped.")
    # Raise a job failure upon any errors
    except Exception as e:
        raise Failure(
            description="Failed scrape job.",
            metadata={
                "scraper params": f"{call_info.values}, start: {start}, end: {end}",
                "error": str(e),
            }
        )
    # If scrape job is successful, post-process + output scraped data
    else:
        df = extract_required_data(df, data_processor.required_data)
        df = data_processor.post_process(df.copy(deep=True))
        if len(df) > 0:
            <http://context.log.info|context.log.info>(f"Filtered down to {len(df)} relevant records - materializing.")
            yield Output(df)
        else:
            <http://context.log.info|context.log.info>("Filtered down to 0 relevant records - NOT materializing.")

input_data_sources_job = define_asset_job(
    name="input_data_sources_job",
    selection=AssetSelection.assets(input_data_sources),
    partitions_def=ba_dt_partitions_def,
    tags={MAX_RUNTIME_SECONDS_TAG: 60*5},
    config=backfill_scraper_config,
)
s

Scout

02/15/2024, 4:50 PM
Based on the provided code snippet and the information from the documentation, there are a few potential issues that could be causing problems with your setup: 1. Incorrect
dynamic_partitioned_config
Usage
: The
@dynamic_partitioned_config
decorator is used to generate run configurations for dynamically partitioned assets. However, the function
backfill_scraper_config
is expected to return a dictionary, not a
Config
object. The
config_from_yaml_strings
function takes a list of YAML strings, not a single string. You should ensure that the YAML string is passed as a list:
Copy code
python    config = config_from_yaml_strings([config_yaml_str])
2. Typo in the Configuration Dictionary: There is a typo in the configuration dictionary keys. You have
inout_data_sources
instead of
input_data_sources
when setting the
end
date:
Copy code
python    config["ops"]["input_data_sources"]["end"] = now.date()
3. Missing Import for
config_from_yaml_strings
: Ensure that you have imported the
config_from_yaml_strings
function from Dagster:
Copy code
python    from dagster import config_from_yaml_strings
4. Incorrect Arrow Syntax in Asset Definition: The arrow syntax (
->
) is not valid Python syntax. It should be replaced with the standard Python type hinting syntax (
->
):
Copy code
python    def input_data_sources(context, config: ScrapeConfig) -> pd.DataFrame:
5. Incorrect Use of
<http://context.log.info|context.log.info>
: The logging statements are using an incorrect syntax. Replace
<http://context.log.info|context.log.info>
with `context.log.info`:
Copy code
python    <http://context.log.info|context.log.info>(f"Scraping {call_info.values}, {start}, {end}...")
6. Incorrect Use of
Output
: The
yield Output(df)
statement is not valid within an asset definition. Assets do not yield outputs in the same way that ops do. Instead, you should return the DataFrame directly:
Copy code
python    return df
7. Potential Issues with
ScrapeConfig
: Ensure that
ScrapeConfig
is properly defined and used. If
ScrapeConfig
is intended to be a Pydantic model, you should inherit from
pydantic.BaseModel
instead of
Config
. 8. Ensure Compatibility with Dagster Version: Make sure that the features you are using are compatible with Dagster version 1.6.3. The documentation and APIs can change between versions, so always refer to the documentation that matches the version of Dagster you are using. After addressing these issues, your setup should be closer to working as expected. If you continue to encounter problems, please refer to the Dagster documentation for the version you are using, as it will have the most accurate information for your setup.
t

Tara Larrue

02/15/2024, 5:01 PM
@Scout In this example that you gave, how can I use the
generate_run_config
to generate a default run config for a multi-partitioned asset? Can I pass it into the asset job definition? Can I decorate it with
dynamic_partitioned_config
?
Copy code
python
from dagster import DynamicPartitionsDefinition, MultiPartitionsDefinition, asset

# Define dynamic partitions for each dimension
dynamic_dimension1 = DynamicPartitionsDefinition(name="dynamic_dimension1")
dynamic_dimension2 = DynamicPartitionsDefinition(name="dynamic_dimension2")

# Combine dynamic partitions into a MultiPartitionsDefinition
multi_partitions_def = MultiPartitionsDefinition({
    "dimension1": dynamic_dimension1,
    "dimension2": dynamic_dimension2,
})

# Define your asset with the multi-dimension partitions
@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
    # Access partition keys for each dimension
    partition_keys = context.partition_key.keys_by_dimension
    dimension1_key = partition_keys["dimension1"]
    dimension2_key = partition_keys["dimension2"]
    
    # Use the partition keys to perform asset computation
    # ...

# Define a function to generate run configuration based on partition keys
def generate_run_config(partition_key):
    # Extract keys for each dimension from the partition key
    keys_by_dimension = partition_key.keys_by_dimension()
    dimension1_key = keys_by_dimension["dimension1"]
    dimension2_key = keys_by_dimension["dimension2"]
    
    # Generate and return the run configuration for these partition keys
    return {
        "solids": {
            "my_multi_dimension_asset": {
                "config": {
                    "dimension1": dimension1_key,
                    "dimension2": dimension2_key,
                }
            }
        }
    }

# Use the generate_run_config function when launching a backfill or materializing the asset
s

Scout

02/15/2024, 5:01 PM
In Dagster, the
generate_run_config
function can be used to create a run configuration for a job that targets a multi-partitioned asset. However, you cannot directly pass this function into the asset definition. Instead, you should use it in conjunction with a job that targets the asset. To use
generate_run_config
for a multi-partitioned asset, you would typically define a job that includes the asset and then provide the run configuration when launching a run for that job. The
dynamic_partitioned_config
decorator can be used to decorate a function that generates run configurations for dynamically partitioned jobs. Here's how you can define a job that uses
generate_run_config
to provide default run configurations for each partition:
Copy code
python
from dagster import define_asset_job, DynamicPartitionsDefinition, MultiPartitionsDefinition, asset, dynamic_partitioned_config

# ... (previous code defining dynamic_dimension1, dynamic_dimension2, and multi_partitions_def)

@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
    # Asset implementation
    # ...

# Define a function to generate run configuration based on partition keys
@dynamic_partitioned_config
def generate_run_config(partition_key):
    # Extract keys for each dimension from the partition key
    keys_by_dimension = partition_key.keys_by_dimension()
    dimension1_key = keys_by_dimension["dimension1"]
    dimension2_key = keys_by_dimension["dimension2"]
    
    # Generate and return the run configuration for these partition keys
    return {
        "solids": {
            "my_multi_dimension_asset": {
                "config": {
                    "dimension1": dimension1_key,
                    "dimension2": dimension2_key,
                }
            }
        }
    }

# Define a job that targets the multi-partitioned asset
my_asset_job = define_asset_job(
    name="my_asset_job",
    selection="*",
    partitions_def=multi_partitions_def,
    config=generate_run_config,
)

# Now, you can launch a run for `my_asset_job` with the appropriate run configuration for each partition
When you launch a run for
my_asset_job
, Dagster will invoke the
generate_run_config
function for each partition key and use the returned configuration for the run. Please note that the
selection
parameter in
define_asset_job
should be set to include the assets you want to target. In this example, I've used
"*"
to indicate all assets, but you may want to specify a more targeted selection. Also, ensure that the structure of the run configuration dictionary matches the schema expected by your job's or asset's configuration. The example above assumes that
my_multi_dimension_asset
is defined as a solid within the job, which may not be the case depending on
how your Dagster job is structured. Always refer to the Dagster documentation for the version you are using to ensure that you are following the correct practices for defining and configuring jobs and assets.
t

Tara Larrue

02/15/2024, 5:09 PM
@Scout error:
dagster._check.ParameterCheckError: Param "config" is not one of ['ConfigMapping', 'Mapping', 'PartitionedConfig', 'RunConfig']. Got <function dynamic_partitioned_config.<locals>.inner at 0x13f120fe0> which is type <class 'function'>.
s

Scout

02/15/2024, 5:10 PM
The error message you're encountering indicates that the
config
parameter you're passing to
define_asset_job
is not one of the expected types (
ConfigMapping
,
Mapping
,
PartitionedConfig
,
RunConfig
). Instead, it's receiving a function, which is not valid. In Dagster, the
config
parameter for
define_asset_job
expects a configuration object or a function that returns a configuration object, not a decorator. The
dynamic_partitioned_config
decorator is used to create a function that can generate run configurations for each partition key when you're dealing with dynamic partitions. To resolve this error, you should ensure that you're passing a valid configuration object or a function that returns such an object to the
config
parameter. If you're using dynamic partitions, you should pass the function decorated with
dynamic_partitioned_config
directly to the
config
parameter without calling it. Here's how you can modify your code to fix the error:
Copy code
python
from dagster import define_asset_job, DynamicPartitionsDefinition, MultiPartitionsDefinition, asset, dynamic_partitioned_config

# ... (previous code defining dynamic_dimension1, dynamic_dimension2, and multi_partitions_def)

@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
    # Asset implementation
    # ...

# Define a function to generate run configuration based on partition keys
@dynamic_partitioned_config
def generate_run_config(partition_key):
    # Extract keys for each dimension from the partition key
    keys_by_dimension = partition_key.keys_by_dimension()
    dimension1_key = keys_by_dimension["dimension1"]
    dimension2_key = keys_by_dimension["dimension2"]
    
    # Generate and return the run configuration for these partition keys
    return {
        "solids": {
            "my_multi_dimension_asset": {
                "config": {
                    "dimension1": dimension1_key,
                    "dimension2": dimension2_key,
                }
            }
        }
    }

# Define a job that targets the multi-partitioned asset
# Note: Do not call generate_run_config, pass the function itself
my_asset_job = define_asset_job(
    name="my_asset_job",
    selection="*",
    partitions_def=multi_partitions_def,
    config=generate_run_config,  # Pass the function itself, not the result of calling it
)

# Now, you can launch a run for `my_asset_job` with the appropriate run configuration for each partition
Make sure that you're using the
generate_run_config
function itself as the value for the
config
parameter, not the result of calling the function. The
dynamic_partitioned_config
decorator will handle the invocation of the function for each partition key when you launch a run for the job. If you continue to encount
er issues, please refer to the Dagster documentation for the version you are using, as the APIs and best practices may have changed.
t

Tara Larrue

02/15/2024, 5:31 PM
@Scout This is not working, I am generating the same error. I have testing to make sure
backfill_scraper_config
returns a Dict representing a config like:
Copy code
{'ops': {'input_data_sources': {'config': {'abbrev': 'SPP_KANSAS',
    'datatype': 'LMP',
    'node': 'WR.FRW.1',
    'need_type': 'spp'},
   'backfill': True,
   'start': '2023-02-15'}}}

#jobs.py --
def run_config_for_partition(partition: MultiPartitionKey):

    partition_keys = partition.keys_by_dimension
    config_file_name = partition_keys['ba_abbrev'] + "~" + partition_keys['datatype'] + ".yaml"
    config_file_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / config_file_name
    configs = config_from_files([str(config_file_path)])
    return configs


@dynamic_partitioned_config
def backfill_scraper_config(partition: MultiPartitionKey):

    config = run_config_for_partition(partition)

    # edit config for backfill job - defaults to 1 year backfill
    now = datetime.now()
    config["ops"]["input_data_sources"]["backfill"] = True
    config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).date().isoformat()
    config["ops"]["input_data_sources"]["end"] = now.date().isoformat()

    return config


input_data_sources_job = define_asset_job(
    name="input_data_sources_job",
    selection=AssetSelection.assets(input_data_sources),
    partitions_def=ba_dt_partitions_def,
    tags={MAX_RUNTIME_SECONDS_TAG: 60*5},
    config=backfill_scraper_config,
)
s

Scout

02/15/2024, 5:31 PM
The error you're encountering suggests that the
config
parameter in
define_asset_job
is not being provided with a valid configuration object or function. The
dynamic_partitioned_config
decorator is used to create a function that generates run configurations for dynamically partitioned jobs, and it should return a dictionary representing the run configuration. From the code snippet you've provided, it seems like you're using the
dynamic_partitioned_config
decorator correctly. However, the error message indicates that Dagster is not recognizing the decorated function as a valid configuration. One potential issue could be related to the version of Dagster you are using. As of my knowledge cutoff in early 2023, the
dynamic_partitioned_config
decorator was not available in Dagster. Instead, Dagster used the
PartitionedConfig
class to define partitioned configurations for jobs. If
dynamic_partitioned_config
is a newer feature introduced after my knowledge cutoff, you should ensure that you are using a version of Dagster that supports this feature. If
dynamic_partitioned_config
is indeed supported in your version of Dagster, then the issue might be with how the configuration is being passed to
define_asset_job
. Here's a suggestion to troubleshoot the issue: 1. Ensure that
backfill_scraper_config
is returning a dictionary that matches the expected schema for your job's configuration. 2. Verify that the
dynamic_partitioned_config
decorator is available in your version of Dagster and that you are using it correctly according to the documentation. 3. Check if there are any known issues or bugs related to
dynamic_partitioned_config
in the version of Dagster you are using by consulting the release notes or the Dagster GitHub repository. 4. If the problem persists, consider reaching out to the Dagster community or filing an issue on the Dagster GitHub repository for further assistance. As a last resort, if
dynamic_partitioned_config
is not working as expected, you could manually create a
PartitionedConfig
object and pass it to
define_asset_job
as the
config
parameter. However, this approach would require you to define a
partitions_def
and a
run_config_for_partition_fn
that generates the run configuration for each partition key. Please refer to the Dagster documentation for the version you are using to ensure that you are following the correct practices for defining and configuring jobs and assets. If the feature is not documented, it may not be supported in your version of Dagster, and you may need to update your approach accordingly.
t

Tara Larrue

02/15/2024, 5:39 PM
@Scout should this work for dagster 1.6.3?
Copy code
def run_config_for_partition(partition: MultiPartitionKey):

    partition_keys = partition.keys_by_dimension
    config_file_name = partition_keys['ba_abbrev'] + "~" + partition_keys['datatype'] + ".yaml"
    config_file_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / config_file_name
    configs = config_from_files([str(config_file_path)])
    return configs


def backfill_scraper_config(partition: MultiPartitionKey):

    config = run_config_for_partition(partition)

    # edit config for backfill job - defaults to 1 year backfill
    now = datetime.now()
    config["ops"]["input_data_sources"]["backfill"] = True
    config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).isoformat()
    config["ops"]["input_data_sources"]["end"] = now.isoformat()

    return config


input_data_sources_job = define_asset_job(
    name="input_data_sources_job",
    selection=AssetSelection.assets(input_data_sources),
    partitions_def=ba_dt_partitions_def,
    tags={MAX_RUNTIME_SECONDS_TAG: 60*5},
    config=PartitionedConfig(partitions_def=ba_dt_partitions_def,
                             run_config_for_partition_key_fn=backfill_scraper_config),
)
s

Scout

02/15/2024, 5:39 PM
In Dagster 1.6.3, the
PartitionedConfig
class is used to define a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job. The
PartitionedConfig
object takes a
partitions_def
and a function that generates the run configuration for each partition key, which is provided by the
run_config_for_partition_key_fn
parameter. Based on the code snippet you provided, it looks like you are trying to use the
PartitionedConfig
class correctly by passing the
partitions_def
and the
run_config_for_partition_key_fn
to it. However, there are a couple of things to note: 1. The
config_from_files
function is not a standard Dagster API function as of my knowledge cutoff in early 2023. If this is a custom function you've written, ensure that it returns a dictionary representing the run configuration. If it's a new Dagster API introduced after my knowledge cutoff, please refer to the relevant documentation. 2. The
run_config_for_partition_key_fn
should be a function that takes a
partition_key
and returns a run configuration dictionary. In your code, you're using
backfill_scraper_config
as this function, which seems to be correct. 3. Ensure that the
backfill_scraper_config
function modifies the configuration dictionary correctly and that the keys used in the dictionary match the expected structure of your job's configuration schema. 4. The
PartitionedConfig
object is then passed to the
config
parameter of
define_asset_job
, which should be the correct usage. Here's a slightly modified version of your code snippet that should work with Dagster 1.6.3, assuming that
config_from_files
is a valid function that returns the correct configuration dictionary: ```python from dagster import define_asset_job, AssetSelection, PartitionedConfig from datetime import datetime, timedelta from pathlib import Path DAGSTER_SCRAPER_CONFIG_LOC = "./scraper_configs" def run_config_for_partition(partition: MultiPartitionKey): partition_keys = partition.keys_by_dimension config_file_name = partition_keys['ba_abbrev'] + "~" + partition_keys['datatype'] + ".yaml" config_file_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / config_file_name configs = config_from_files([str(config_file_path)]) return configs def backfill_scraper_config(partition: MultiPartitionKey): config = run_config_for_partition(partition) now = datetime.now() config["ops"]["input_data_sources"]["backfill"] = True config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).isoformat() config["ops"]["input_data_sources"]["end"] = now.isoformat() return config input_data_sources_job = define_asset_job( name="input_data_sources_job", select
ion=AssetSelection.assets(input_data_sources), partitions_def=ba_dt_partitions_def, tags={MAX_RUNTIME_SECONDS_TAG: 60*5}, config=PartitionedConfig( partitions_def=ba_dt_partitions_def, run_config_for_partition_key_fn=backfill_scraper_config ) ) ``` Make sure that
input_data_sources
,
MultiPartitionKey
,
ba_dt_partitions_def
, and
MAX_RUNTIME_SECONDS_TAG
are defined correctly elsewhere in your code. If you encounter any issues, please refer to the Dagster documentation for version 1.6.3 or consider reaching out to the Dagster community for further assistance.
6 Views