https://dagster.io/ logo
#ask-ai
Title
# ask-ai
f

Fernando Cagua

02/20/2024, 2:54 AM
I have an asset with daily partitions, how can I create a job and a schedule so that the last 10 partitions are materialised regardless of whether they have been successfully materialised in the past?
s

Scout

02/20/2024, 2:54 AM
To create a job that targets an asset with daily partitions and a schedule that materializes the last 10 partitions regardless of their past materialization status, you can follow these steps: 1. Define your asset with a
DailyPartitionsDefinition
to specify daily partitioning. 2. Define an asset job using
define_asset_job
that targets your asset. 3. Write a custom schedule function that generates run requests for the last 10 partitions. Here's an example based on the documentation:
Copy code
python
from dagster import DailyPartitionsDefinition, asset, define_asset_job, ScheduleDefinition
from datetime import datetime, timedelta

# Define your daily-partitioned asset
partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")

@asset(partitions_def=partitions_def)
def my_daily_asset():
    ...

# Define an asset job that targets your asset
my_asset_job = define_asset_job("my_asset_job", selection=[my_daily_asset])

# Define a custom schedule that materializes the last 10 partitions
def my_custom_schedule(context):
    run_requests = []
    today = datetime.today()
    for i in range(1, 11):  # Last 10 days including today
        partition_date = today - timedelta(days=i)
        partition_key = partitions_def.get_partition_key(partition_date)
        run_requests.append(RunRequest(run_key=None, run_config={}, tags={}, partition_key=partition_key))
    return run_requests

# Create the schedule definition
my_schedule = ScheduleDefinition(
    job=my_asset_job,
    cron_schedule="0 0 * * *",  # This cron schedule runs daily at midnight
    execution_fn=my_custom_schedule
)
In this example,
my_custom_schedule
is a custom schedule function that calculates the dates for the last 10 partitions and creates a
RunRequest
for each one. The
cron_schedule
is set to run daily at midnight, but you can adjust this to your needs. Please note that the code provided is a conceptual example based on the documentation. You may need to adjust the code to fit your specific use case, such as setting the correct
start_date
for your partitions and ensuring that the
my_daily_asset
function reflects the logic of your asset. Remember to test your code in a development environment to ensure it behaves as expected before deploying it to production. Schedules for Partitioned assets button Backfill assets on a partition schedule button Materialising Across Different Partition... button What's the best way to schedule material... button Thank you for your feedback!
f

Fernando Cagua

02/20/2024, 3:26 AM
@Scout I’m getting the following error. How do suggest I fix it:
s

Scout

02/20/2024, 3:26 AM
It seems like the error message you're encountering was not included in your message. Could you please provide the error message or describe the issue you're facing in more detail? This will help me to give you more accurate advice on how to fix the problem.
f

Fernando Cagua

02/20/2024, 3:27 AM
@Scout I’m getting the following error. How do suggest I fix it:
Copy code
agster._check.CheckError: Invariant failed. Description: Schedules that return multiple RunRequests must specify a run_key in each RunRequest
s

Scout

02/20/2024, 3:27 AM
The error message you're encountering indicates that when your schedule function is returning multiple
RunRequest
objects, each
RunRequest
must have a unique
run_key
specified. The
run_key
is used by Dagster to deduplicate runs and ensure that the same run isn't executed more than once. To fix this error, you need to modify your schedule function to include a
run_key
for each
RunRequest
. The
run_key
should be a string that uniquely identifies each scheduled run. Often, the partition key or a combination of the partition key and a timestamp is used as the
run_key
. Here's an example of how you might modify your schedule function to include `run_key`s:
Copy code
python from dagster import ScheduleDefinition, RunRequest from datetime import datetime, timedelta  def my_custom_schedule(context):     run_requests = []     today = datetime.today()     for i in range(1, 11):  # Last 10 days including today         partition_date = today - timedelta(days=i)         partition_key = partition_date.strftime("%Y-%m-%d")         run_key = f"run_for_{partition_key}"  # Unique run_key for each RunRequest         run_requests.append(RunRequest(run_key=run_key, run_config={}, tags={}, partition_key=partition_key))     return run_requests  # Create the schedule definition my_schedule = ScheduleDefinition(     job=my_asset_job,     cron_schedule="0 0 * * *",  # This cron schedule runs daily at midnight     execution_fn=my_custom_schedule )
In this example,
run_key
is constructed using the
partition_key
, which is based on the date. This ensures that each
RunRequest
has a unique identifier. Make sure to replace
my_asset_job
with the actual job that you have defined. After making this change, the error should be resolved, and your schedule should be able to generate multiple
RunRequest
objects without encountering the
CheckError
.
f

Fernando Cagua

02/20/2024, 3:34 AM
@Scout Now I’m getting `dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key
2024-02-21
.` How can I solve that?
s

Scout

02/20/2024, 3:34 AM
The
DagsterUnknownPartitionError
indicating that a partition with the specified key could not be found typically occurs when the partition key provided in the
RunRequest
does not match any of the partition keys defined by the
PartitionsDefinition
of the asset. This can happen if the partition key you're trying to materialize does not exist within the range of partitions defined by your
DailyPartitionsDefinition
. For example, if your
DailyPartitionsDefinition
starts at a later date than the partition key you're trying to materialize, or if the partition key is for a future date that hasn't been defined yet. To resolve this issue, you need to ensure that the partition keys you're generating in your schedule function are valid according to the
DailyPartitionsDefinition
of your asset. Here are some steps you can take: 1. Verify the
start_date
of your
DailyPartitionsDefinition
to ensure it covers the dates you're trying to materialize. 2. Ensure that the partition keys you're generating correspond to actual dates that should have partitions according to your definition. 3. If you're using a custom partition key format, ensure that it matches the format expected by your
DailyPartitionsDefinition
. If you're confident that the partition keys should exist according to your partitions definition, then you might need to debug further to understand why Dagster is not recognizing them. Here are some debugging tips: - Check the
start_date
and any other configuration of your
DailyPartitionsDefinition
to ensure it's correct. - Review the code that generates the partition keys to ensure it's generating the expected values. - Use the Dagster API or UI to list the available partitions for your asset to confirm that they match what you expect. If you've verified that the partition keys are correct and you're still encountering this error, it could be a bug or an issue with how Dagster is loading or recognizing your partitions. In that case, you may want to reach out to the Dagster community or support channels with specific details about your setup for further assistance. DynamicPartitions via pyspark_step_launc... button A non partitioned asset graph gets assig... button Default I/O manager does not support par... button Can’t reference dynamically created part... button Thank you for your feedback!
7 Views