Tara Larrue
02/15/2024, 2:50 PMScout
02/15/2024, 2:50 PMPartitionedConfig
class provided by Dagster. This class allows you to define a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job. Here's a step-by-step guide based on the documentation: 1. Define a PartitionedConfig
for your job. This involves specifying a partitions definition and a function that maps partition keys to run configuration. 2. Implement a function that takes a partition key (and optionally the current time or other relevant data) and returns the run configuration for that partition. This function is typically passed as the run_config_for_partition_fn
parameter when creating a PartitionedConfig
. 3. Set the PartitionedConfig
as the config for your job. This allows you to launch backfills for that job and view the run history across partitions. 4. When you initiate a backfill, Dagster will use the PartitionedConfig
to determine the run configuration for each partition that is included in the backfill. Here is an example of how you might define a `PartitionedConfig`: python from dagster import PartitionedConfig, daily_partitioned_config @daily_partitioned_config(start_date="2021-01-01") def my_partitioned_config(start, end): # Generate the run configuration for the partition defined by start and end dates return { "solids": { "my_solid": { "config": { "date": start.strftime("%Y-%m-%d") } } } } # Use the PartitionedConfig when defining your job my_job = my_job.configured(PartitionedConfig(partitions_def=my_partitions_def, run_config_for_partition_fn=my_partitioned_config))
In this example, my_partitioned_config
is a function that generates the run configuration for each partition based on the start date of the partition. When you launch a backfill, Dagster will call this function for each partition you've selected to include in the backfill and use the returned configuration for the runs. Please note that the actual implementation may vary based on your specific use case and the structure of your Dagster job. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance.Tara Larrue
02/15/2024, 4:06 PMdynamic_partitioned_config
for an asset that has a multipartition comprised of 2 dynamic dimensions?Scout
02/15/2024, 4:06 PMdynamic_partitioned_config
for an asset with a multipartition comprised of two dynamic dimensions. The dynamic_partitioned_config
function is used to generate run configurations for dynamically partitioned assets, but the documentation does not explicitly cover the case of multipartitions with multiple dynamic dimensions.
However, I can guide you on how you might approach this based on the concepts provided by Dagster for dynamic partitions and multipartitions:
1. Define your dynamic partitions using DynamicPartitionsDefinition
for each dimension.
2. Combine these dynamic partitions into a MultiPartitionsDefinition
.
3. Create a function that generates the run configuration based on the partition keys for each dimension.
4. Use this function to provide the run configuration when launching a backfill or materializing the asset.
Here's a conceptual example that outlines the steps you might take:
python
from dagster import DynamicPartitionsDefinition, MultiPartitionsDefinition, asset
# Define dynamic partitions for each dimension
dynamic_dimension1 = DynamicPartitionsDefinition(name="dynamic_dimension1")
dynamic_dimension2 = DynamicPartitionsDefinition(name="dynamic_dimension2")
# Combine dynamic partitions into a MultiPartitionsDefinition
multi_partitions_def = MultiPartitionsDefinition({
"dimension1": dynamic_dimension1,
"dimension2": dynamic_dimension2,
})
# Define your asset with the multi-dimension partitions
@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
# Access partition keys for each dimension
partition_keys = context.partition_key.keys_by_dimension
dimension1_key = partition_keys["dimension1"]
dimension2_key = partition_keys["dimension2"]
# Use the partition keys to perform asset computation
# ...
# Define a function to generate run configuration based on partition keys
def generate_run_config(partition_key):
# Extract keys for each dimension from the partition key
keys_by_dimension = partition_key.keys_by_dimension()
dimension1_key = keys_by_dimension["dimension1"]
dimension2_key = keys_by_dimension["dimension2"]
# Generate and return the run configuration for these partition keys
return {
"solids": {
"my_multi_dimension_asset": {
"config": {
"dimension1": dimension1_key,
"dimension2": dimension2_key,
}
}
}
}
# Use the generate_run_config function when launching a backfill or materializing the asset
Please note that this is a conceptual example and may not directly work without adjustments specScout
02/15/2024, 4:07 PMTara Larrue
02/15/2024, 4:09 PMScout
02/15/2024, 4:10 PMdagster.config_from_yaml_strings
can be used to construct a run config dictionary from a list of YAML strings. Here's an example of how you might use it: python from dagster import config_from_yaml_strings yaml_strings = [ """ solids: my_solid: config: key: value """ ] run_config = config_from_yaml_strings(yaml_strings)
After calling config_from_yaml_strings
, run_config
will be a dictionary constructed from the provided YAML strings, which you can then use as the configuration for a Dagster job or asset. Please note that if the YAML document is invalid and has a parse error, a DagsterInvariantViolationError
will be raised. For more details and the latest updates, please refer to the Dagster documentation.Tara Larrue
02/15/2024, 4:34 PMrun_config
from this object?Scout
02/15/2024, 4:34 PMrun_config
generated from a YAML string using config_from_yaml_strings
is a Python dictionary. You can edit this dictionary just like any other Python dictionary. Here's an example of how you might modify the `run_config`: python from dagster import config_from_yaml_strings yaml_strings = [ """ solids: my_solid: config: key: value """ ] # Generate the initial run_config from YAML strings run_config = config_from_yaml_strings(yaml_strings) # Now, run_config is a Python dictionary that can be edited # For example, to change the value of 'key' under 'my_solid' config: run_config['solids']['my_solid']['config']['key'] = 'new_value' # To add a new key-value pair run_config['solids']['my_solid']['config']['new_key'] = 'new_value' # To remove a key del run_config['solids']['my_solid']['config']['key_to_remove'] # To add a new solid config run_config['solids']['another_solid'] = {'config': {'another_key': 'another_value'}} # After editing, you can use run_config to execute a job or asset
After making the desired changes to the run_config
dictionary, you can pass it to a job execution function such as execute_in_process
or use it when launching a run through the Dagster UI or CLI. Remember that the structure of the run_config
dictionary should match the schema expected by your job or asset. If you provide an invalid configuration, Dagster will raise a DagsterInvalidConfigError
when you try to execute the job or asset. Always ensure that your edits conform to the expected schema defined by your job's or asset's configuration.Tara Larrue
02/15/2024, 4:50 PMclass ScrapeConfig(Config):
abbrev: str
datatype: str
node: Optional[str] = None
meta: Optional[str] = None
need_type: Optional[str] = "data"
lookback_hrs: Optional[int] = 6
backfill: Optional[bool] = False
start: Optional[str] = None
end: Optional[str] = None
DAGSTER_SCRAPER_CONFIG_LOC = "./scraper_configs"
@dynamic_partitioned_config
def backfill_scraper_config(partition_key):
"""Generate a ScrapeConfig object for a `input_data_sources` partitioned backfill job."""
# find partition dimension keys
keys_by_dimension = partition_key.keys_by_dimension()
ba_abbrev = keys_by_dimension["ba_abbrev"]
datatype = keys_by_dimension["datatype"]
# read config from config file
config_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / f"{ba_abbrev}~{datatype}.yaml"
config_yaml_str = config_path.read_text(encoding="utf-8")
config = config_from_yaml_strings(config_yaml_str)
# edit config for backfill job - defaults to 1 year backfill
now = datetime.now()
config["ops"]["input_data_sources"]["backfill"] = True
config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).date()
config["ops"]["inout_data_sources"]["end"] = now.date()
return config
ba_abbrev_partitions_def = DynamicPartitionsDefinition(name="ba_abbrev")
datatype_partitions_def = DynamicPartitionsDefinition(name="datatype")
ba_dt_partitions_def = MultiPartitionsDefinition(
{
"ba_abbrev": ba_abbrev_partitions_def,
"datatype": datatype_partitions_def,
}
)
@asset(partitions_def=ba_dt_partitions_def, output_required=False)
def input_data_sources(context, config: ScrapeConfig) -> pd.DataFrame:
"""Raw scraped input data for all WT products, partitioned by BA + datatype.
Store as file."""
# Get specified watttime-scraper module
scraper_module = find_scraper_module(config)
# Convert config to watttime DataNeeds object
data_processor = NEED_MAP[config.need_type](
abbrev=config.abbrev, datatype=config.datatype, node=config.node, meta=config.meta)
# Get scraper call_info series from needs object
call_info = pd.Series(data_processor.unique_scraper_data, dtype=object)
call_info["func"] = scraper_module.DATATYPE_TO_METHOD[config.datatype]
# Get job time range, specified by config
start, end = get_scrape_job_time_range(config)
# Scrape data
try:
<http://context.log.info|context.log.info>(f"Scraping {call_info.values}, {start}, {end}...")
df = call_info.func(
start,
end,
call_info.get("abbrev", None),
call_info.get("node", None),
)
<http://context.log.info|context.log.info>(f"{len(df)} total records scraped.")
# Raise a job failure upon any errors
except Exception as e:
raise Failure(
description="Failed scrape job.",
metadata={
"scraper params": f"{call_info.values}, start: {start}, end: {end}",
"error": str(e),
}
)
# If scrape job is successful, post-process + output scraped data
else:
df = extract_required_data(df, data_processor.required_data)
df = data_processor.post_process(df.copy(deep=True))
if len(df) > 0:
<http://context.log.info|context.log.info>(f"Filtered down to {len(df)} relevant records - materializing.")
yield Output(df)
else:
<http://context.log.info|context.log.info>("Filtered down to 0 relevant records - NOT materializing.")
input_data_sources_job = define_asset_job(
name="input_data_sources_job",
selection=AssetSelection.assets(input_data_sources),
partitions_def=ba_dt_partitions_def,
tags={MAX_RUNTIME_SECONDS_TAG: 60*5},
config=backfill_scraper_config,
)
Scout
02/15/2024, 4:50 PMdynamic_partitioned_config
Usage: The @dynamic_partitioned_config
decorator is used to generate run configurations for dynamically partitioned assets. However, the function backfill_scraper_config
is expected to return a dictionary, not a Config
object. The config_from_yaml_strings
function takes a list of YAML strings, not a single string. You should ensure that the YAML string is passed as a list: python config = config_from_yaml_strings([config_yaml_str])
2. Typo in the Configuration Dictionary: There is a typo in the configuration dictionary keys. You have inout_data_sources
instead of input_data_sources
when setting the end
date: python config["ops"]["input_data_sources"]["end"] = now.date()
3. Missing Import for config_from_yaml_strings
: Ensure that you have imported the config_from_yaml_strings
function from Dagster: python from dagster import config_from_yaml_strings
4. Incorrect Arrow Syntax in Asset Definition: The arrow syntax (->
) is not valid Python syntax. It should be replaced with the standard Python type hinting syntax (->
): python def input_data_sources(context, config: ScrapeConfig) -> pd.DataFrame:
5. Incorrect Use of <http://context.log.info|context.log.info>
: The logging statements are using an incorrect syntax. Replace <http://context.log.info|context.log.info>
with `context.log.info`: python <http://context.log.info|context.log.info>(f"Scraping {call_info.values}, {start}, {end}...")
6. Incorrect Use of Output
: The yield Output(df)
statement is not valid within an asset definition. Assets do not yield outputs in the same way that ops do. Instead, you should return the DataFrame directly: python return df
7. Potential Issues with ScrapeConfig
: Ensure that ScrapeConfig
is properly defined and used. If ScrapeConfig
is intended to be a Pydantic model, you should inherit from pydantic.BaseModel
instead of Config
. 8. Ensure Compatibility with Dagster Version: Make sure that the features you are using are compatible with Dagster version 1.6.3. The documentation and APIs can change between versions, so always refer to the documentation that matches the version of Dagster you are using. After addressing these issues, your setup should be closer to working as expected. If you continue to encounter problems, please refer to the Dagster documentation for the version you are using, as it will have the most accurate information for your setup.Tara Larrue
02/15/2024, 5:01 PMgenerate_run_config
to generate a default run config for a multi-partitioned asset? Can I pass it into the asset job definition? Can I decorate it with dynamic_partitioned_config
?
python
from dagster import DynamicPartitionsDefinition, MultiPartitionsDefinition, asset
# Define dynamic partitions for each dimension
dynamic_dimension1 = DynamicPartitionsDefinition(name="dynamic_dimension1")
dynamic_dimension2 = DynamicPartitionsDefinition(name="dynamic_dimension2")
# Combine dynamic partitions into a MultiPartitionsDefinition
multi_partitions_def = MultiPartitionsDefinition({
"dimension1": dynamic_dimension1,
"dimension2": dynamic_dimension2,
})
# Define your asset with the multi-dimension partitions
@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
# Access partition keys for each dimension
partition_keys = context.partition_key.keys_by_dimension
dimension1_key = partition_keys["dimension1"]
dimension2_key = partition_keys["dimension2"]
# Use the partition keys to perform asset computation
# ...
# Define a function to generate run configuration based on partition keys
def generate_run_config(partition_key):
# Extract keys for each dimension from the partition key
keys_by_dimension = partition_key.keys_by_dimension()
dimension1_key = keys_by_dimension["dimension1"]
dimension2_key = keys_by_dimension["dimension2"]
# Generate and return the run configuration for these partition keys
return {
"solids": {
"my_multi_dimension_asset": {
"config": {
"dimension1": dimension1_key,
"dimension2": dimension2_key,
}
}
}
}
# Use the generate_run_config function when launching a backfill or materializing the asset
Scout
02/15/2024, 5:01 PMgenerate_run_config
function can be used to create a run configuration for a job that targets a multi-partitioned asset. However, you cannot directly pass this function into the asset definition. Instead, you should use it in conjunction with a job that targets the asset.
To use generate_run_config
for a multi-partitioned asset, you would typically define a job that includes the asset and then provide the run configuration when launching a run for that job. The dynamic_partitioned_config
decorator can be used to decorate a function that generates run configurations for dynamically partitioned jobs.
Here's how you can define a job that uses generate_run_config
to provide default run configurations for each partition:
python
from dagster import define_asset_job, DynamicPartitionsDefinition, MultiPartitionsDefinition, asset, dynamic_partitioned_config
# ... (previous code defining dynamic_dimension1, dynamic_dimension2, and multi_partitions_def)
@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
# Asset implementation
# ...
# Define a function to generate run configuration based on partition keys
@dynamic_partitioned_config
def generate_run_config(partition_key):
# Extract keys for each dimension from the partition key
keys_by_dimension = partition_key.keys_by_dimension()
dimension1_key = keys_by_dimension["dimension1"]
dimension2_key = keys_by_dimension["dimension2"]
# Generate and return the run configuration for these partition keys
return {
"solids": {
"my_multi_dimension_asset": {
"config": {
"dimension1": dimension1_key,
"dimension2": dimension2_key,
}
}
}
}
# Define a job that targets the multi-partitioned asset
my_asset_job = define_asset_job(
name="my_asset_job",
selection="*",
partitions_def=multi_partitions_def,
config=generate_run_config,
)
# Now, you can launch a run for `my_asset_job` with the appropriate run configuration for each partition
When you launch a run for my_asset_job
, Dagster will invoke the generate_run_config
function for each partition key and use the returned configuration for the run.
Please note that the selection
parameter in define_asset_job
should be set to include the assets you want to target. In this example, I've used "*"
to indicate all assets, but you may want to specify a more targeted selection.
Also, ensure that the structure of the run configuration dictionary matches the schema expected by your job's or asset's configuration. The example above assumes that my_multi_dimension_asset
is defined as a solid within the job, which may not be the case depending onScout
02/15/2024, 5:02 PMTara Larrue
02/15/2024, 5:09 PMdagster._check.ParameterCheckError: Param "config" is not one of ['ConfigMapping', 'Mapping', 'PartitionedConfig', 'RunConfig']. Got <function dynamic_partitioned_config.<locals>.inner at 0x13f120fe0> which is type <class 'function'>.
Scout
02/15/2024, 5:10 PMconfig
parameter you're passing to define_asset_job
is not one of the expected types (ConfigMapping
, Mapping
, PartitionedConfig
, RunConfig
). Instead, it's receiving a function, which is not valid.
In Dagster, the config
parameter for define_asset_job
expects a configuration object or a function that returns a configuration object, not a decorator. The dynamic_partitioned_config
decorator is used to create a function that can generate run configurations for each partition key when you're dealing with dynamic partitions.
To resolve this error, you should ensure that you're passing a valid configuration object or a function that returns such an object to the config
parameter. If you're using dynamic partitions, you should pass the function decorated with dynamic_partitioned_config
directly to the config
parameter without calling it.
Here's how you can modify your code to fix the error:
python
from dagster import define_asset_job, DynamicPartitionsDefinition, MultiPartitionsDefinition, asset, dynamic_partitioned_config
# ... (previous code defining dynamic_dimension1, dynamic_dimension2, and multi_partitions_def)
@asset(partitions_def=multi_partitions_def)
def my_multi_dimension_asset(context):
# Asset implementation
# ...
# Define a function to generate run configuration based on partition keys
@dynamic_partitioned_config
def generate_run_config(partition_key):
# Extract keys for each dimension from the partition key
keys_by_dimension = partition_key.keys_by_dimension()
dimension1_key = keys_by_dimension["dimension1"]
dimension2_key = keys_by_dimension["dimension2"]
# Generate and return the run configuration for these partition keys
return {
"solids": {
"my_multi_dimension_asset": {
"config": {
"dimension1": dimension1_key,
"dimension2": dimension2_key,
}
}
}
}
# Define a job that targets the multi-partitioned asset
# Note: Do not call generate_run_config, pass the function itself
my_asset_job = define_asset_job(
name="my_asset_job",
selection="*",
partitions_def=multi_partitions_def,
config=generate_run_config, # Pass the function itself, not the result of calling it
)
# Now, you can launch a run for `my_asset_job` with the appropriate run configuration for each partition
Make sure that you're using the generate_run_config
function itself as the value for the config
parameter, not the result of calling the function. The dynamic_partitioned_config
decorator will handle the invocation of the function for each partition key when you launch a run for the job.
If you continue to encountScout
02/15/2024, 5:10 PMTara Larrue
02/15/2024, 5:31 PMbackfill_scraper_config
returns a Dict representing a config like:
{'ops': {'input_data_sources': {'config': {'abbrev': 'SPP_KANSAS',
'datatype': 'LMP',
'node': 'WR.FRW.1',
'need_type': 'spp'},
'backfill': True,
'start': '2023-02-15'}}}
#jobs.py --
def run_config_for_partition(partition: MultiPartitionKey):
partition_keys = partition.keys_by_dimension
config_file_name = partition_keys['ba_abbrev'] + "~" + partition_keys['datatype'] + ".yaml"
config_file_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / config_file_name
configs = config_from_files([str(config_file_path)])
return configs
@dynamic_partitioned_config
def backfill_scraper_config(partition: MultiPartitionKey):
config = run_config_for_partition(partition)
# edit config for backfill job - defaults to 1 year backfill
now = datetime.now()
config["ops"]["input_data_sources"]["backfill"] = True
config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).date().isoformat()
config["ops"]["input_data_sources"]["end"] = now.date().isoformat()
return config
input_data_sources_job = define_asset_job(
name="input_data_sources_job",
selection=AssetSelection.assets(input_data_sources),
partitions_def=ba_dt_partitions_def,
tags={MAX_RUNTIME_SECONDS_TAG: 60*5},
config=backfill_scraper_config,
)
Scout
02/15/2024, 5:31 PMconfig
parameter in define_asset_job
is not being provided with a valid configuration object or function. The dynamic_partitioned_config
decorator is used to create a function that generates run configurations for dynamically partitioned jobs, and it should return a dictionary representing the run configuration. From the code snippet you've provided, it seems like you're using the dynamic_partitioned_config
decorator correctly. However, the error message indicates that Dagster is not recognizing the decorated function as a valid configuration. One potential issue could be related to the version of Dagster you are using. As of my knowledge cutoff in early 2023, the dynamic_partitioned_config
decorator was not available in Dagster. Instead, Dagster used the PartitionedConfig
class to define partitioned configurations for jobs. If dynamic_partitioned_config
is a newer feature introduced after my knowledge cutoff, you should ensure that you are using a version of Dagster that supports this feature. If dynamic_partitioned_config
is indeed supported in your version of Dagster, then the issue might be with how the configuration is being passed to define_asset_job
. Here's a suggestion to troubleshoot the issue: 1. Ensure that backfill_scraper_config
is returning a dictionary that matches the expected schema for your job's configuration. 2. Verify that the dynamic_partitioned_config
decorator is available in your version of Dagster and that you are using it correctly according to the documentation. 3. Check if there are any known issues or bugs related to dynamic_partitioned_config
in the version of Dagster you are using by consulting the release notes or the Dagster GitHub repository. 4. If the problem persists, consider reaching out to the Dagster community or filing an issue on the Dagster GitHub repository for further assistance. As a last resort, if dynamic_partitioned_config
is not working as expected, you could manually create a PartitionedConfig
object and pass it to define_asset_job
as the config
parameter. However, this approach would require you to define a partitions_def
and a run_config_for_partition_fn
that generates the run configuration for each partition key. Please refer to the Dagster documentation for the version you are using to ensure that you are following the correct practices for defining and configuring jobs and assets. If the feature is not documented, it may not be supported in your version of Dagster, and you may need to update your approach accordingly.Tara Larrue
02/15/2024, 5:39 PMdef run_config_for_partition(partition: MultiPartitionKey):
partition_keys = partition.keys_by_dimension
config_file_name = partition_keys['ba_abbrev'] + "~" + partition_keys['datatype'] + ".yaml"
config_file_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / config_file_name
configs = config_from_files([str(config_file_path)])
return configs
def backfill_scraper_config(partition: MultiPartitionKey):
config = run_config_for_partition(partition)
# edit config for backfill job - defaults to 1 year backfill
now = datetime.now()
config["ops"]["input_data_sources"]["backfill"] = True
config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).isoformat()
config["ops"]["input_data_sources"]["end"] = now.isoformat()
return config
input_data_sources_job = define_asset_job(
name="input_data_sources_job",
selection=AssetSelection.assets(input_data_sources),
partitions_def=ba_dt_partitions_def,
tags={MAX_RUNTIME_SECONDS_TAG: 60*5},
config=PartitionedConfig(partitions_def=ba_dt_partitions_def,
run_config_for_partition_key_fn=backfill_scraper_config),
)Scout
02/15/2024, 5:39 PMPartitionedConfig
class is used to define a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job. The PartitionedConfig
object takes a partitions_def
and a function that generates the run configuration for each partition key, which is provided by the run_config_for_partition_key_fn
parameter.
Based on the code snippet you provided, it looks like you are trying to use the PartitionedConfig
class correctly by passing the partitions_def
and the run_config_for_partition_key_fn
to it. However, there are a couple of things to note:
1. The config_from_files
function is not a standard Dagster API function as of my knowledge cutoff in early 2023. If this is a custom function you've written, ensure that it returns a dictionary representing the run configuration. If it's a new Dagster API introduced after my knowledge cutoff, please refer to the relevant documentation.
2. The run_config_for_partition_key_fn
should be a function that takes a partition_key
and returns a run configuration dictionary. In your code, you're using backfill_scraper_config
as this function, which seems to be correct.
3. Ensure that the backfill_scraper_config
function modifies the configuration dictionary correctly and that the keys used in the dictionary match the expected structure of your job's configuration schema.
4. The PartitionedConfig
object is then passed to the config
parameter of define_asset_job
, which should be the correct usage.
Here's a slightly modified version of your code snippet that should work with Dagster 1.6.3, assuming that config_from_files
is a valid function that returns the correct configuration dictionary:
```python
from dagster import define_asset_job, AssetSelection, PartitionedConfig
from datetime import datetime, timedelta
from pathlib import Path
DAGSTER_SCRAPER_CONFIG_LOC = "./scraper_configs"
def run_config_for_partition(partition: MultiPartitionKey):
partition_keys = partition.keys_by_dimension
config_file_name = partition_keys['ba_abbrev'] + "~" + partition_keys['datatype'] + ".yaml"
config_file_path = Path(DAGSTER_SCRAPER_CONFIG_LOC) / config_file_name
configs = config_from_files([str(config_file_path)])
return configs
def backfill_scraper_config(partition: MultiPartitionKey):
config = run_config_for_partition(partition)
now = datetime.now()
config["ops"]["input_data_sources"]["backfill"] = True
config["ops"]["input_data_sources"]["start"] = (now - timedelta(days=365)).isoformat()
config["ops"]["input_data_sources"]["end"] = now.isoformat()
return config
input_data_sources_job = define_asset_job(
name="input_data_sources_job",
selectScout
02/15/2024, 5:40 PMinput_data_sources
, MultiPartitionKey
, ba_dt_partitions_def
, and MAX_RUNTIME_SECONDS_TAG
are defined correctly elsewhere in your code. If you encounter any issues, please refer to the Dagster documentation for version 1.6.3 or consider reaching out to the Dagster community for further assistance.