I have an daily partitioned asset that polls an AP...
# ask-ai
j
I have an daily partitioned asset that polls an API every day. The API response is based on a date range, from start_date to end_date, whereas the partition has a static date corresponding to one time step. If the partition key is 2024-02-28 I can convert this into a range by adding 1 day to get an end date. Is there a better or more recommended way that uses the capabilities of dagster to achieve the same thing?
s
In Dagster, you can define partitions for your assets that correspond to time windows, and these partitions can be used to parameterize your assets' computations based on the time window they represent. For your use case, where you have a daily partitioned asset that needs to poll an API using a date range, you can utilize the
context
provided to your asset to determine the start and end times of the partition's time window. The documentation provides an example of how to define a daily partitioned asset and use the
context
to get the start and end times of the partition:
Copy code
python
from dagster import DailyPartitionsDefinition, asset

partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")

@asset(partitions_def=partitions_def)
def my_asset(context):
    # The context provides the start and end times of the partition's time window
    interval_start_datetime, interval_end_datetime = context.asset_partitions_time_window_for_output()
    # Use these times to poll the API
    response = poll_api(start=interval_start_datetime, end=interval_end_datetime)
    # Process the API response
    ...
In this example,
context.asset_partitions_time_window_for_output()
is used to retrieve the start and end times of the current partition's time window, which you can then use to poll your API with the correct date range. This approach leverages Dagster's built-in partitioning capabilities to manage the date ranges for you, ensuring that your asset's logic is aligned with the partition definitions you've set up. It's a clean and recommended way to handle time-based computations in Dagster. Dagster Docs button Current day real time partitioned assets... button Rolling window timerange partition button support asset partitions for load_assets... button Thank you for your feedback!
j
@Scout I am configuring schedules. Some jobs are partitioned, others aren't. I'd like to have a generic schedule-creating function for ease of use. Is there a recommended way of determining whether a job is partitioned or not, and subsequently returning either a
build_schedule_from_partitioned_job
or a normal schedule?
s
The documentation does not provide a direct method for checking if a job is partitioned or not within the API. However, you can infer whether a job is partitioned by examining the job's definition and looking for a
partitions_def
argument. In practice, you would typically know the structure of your jobs and whether they are partitioned based on how they are defined. If you are writing a generic schedule-creating function, you might consider passing an additional parameter to indicate whether the job is partitioned or not, or you could use a naming convention or some other metadata associated with the job to determine its partitioning status. For partitioned jobs, you can use
build_schedule_from_partitioned_job
to create a schedule that matches the partitioning cadence. For non-partitioned jobs, you would use the
@schedule
decorator to define the schedule manually. Here's a conceptual example of how you might write such a function:
Copy code
python from dagster import build_schedule_from_partitioned_job, schedule  def create_schedule_for_job(job, is_partitioned=False):     if is_partitioned:         # Create a schedule for a partitioned job         return build_schedule_from_partitioned_job(job)     else:         # Create a schedule for a non-partitioned job         @schedule(cron_schedule="0 0 * * *", job=job)  # daily at midnight         def non_partitioned_job_schedule(context):             return job.run_request_for_partition(partition_key="default")                  return non_partitioned_job_schedule  # Usage partitioned_job_schedule = create_schedule_for_job(partitioned_job, is_partitioned=True) non_partitioned_job_schedule = create_schedule_for_job(non_partitioned_job, is_partitioned=False)
In this example,
create_schedule_for_job
takes a
job
and an
is_partitioned
flag to determine the type of schedule to create. This is a simplified example, and in a real-world scenario, you might need to pass additional parameters to customize the schedule, such as the cron schedule string, timezone, or other configurations. Remember that this approach requires you to know and specify whether a job is partitioned when calling the function. If you need to determine this programmatically, you would have to inspect the job's definition, which might involve more complex logic and is not directly supported by the Dagster API as per the current documentation.
j
@Scout when running a partitioned job (on a schedule) is there a way to determine the timestamp of the previous run? Is it possible to have a job that runs every 3 hours, for example, where each run pulls data from the previous 3 hours until the current time, but has a daily partition such that back-fills will retrieve data at a daily granularity?
s
In Dagster, partitioned jobs are typically associated with a partition set that defines the granularity of the data processing. For example, a daily partitioned job would process data in daily chunks. However, if you want to run a job every 3 hours and process data from the previous 3 hours until the current time, you can achieve this by using a custom schedule that does not rely on the partitioning system for the time window calculation. Here's how you can set up a schedule that runs every 3 hours and processes data from the previous 3 hours:
Copy code
python from dagster import schedule, RunRequest from datetime import timedelta  @schedule(cron_schedule="0 */3 * * *", job=my_job)  # every 3 hours def every_three_hours_schedule(context):     current_time = context.scheduled_execution_time     start_time = current_time - timedelta(hours=3)          run_config = {         "solids": {             "my_solid": {                 "config": {                     "start_time": start_time.isoformat(),                     "end_time": current_time.isoformat(),                 }             }         }     }          return RunRequest(run_key=current_time.isoformat(), run_config=run_config)
In this example,
my_job
is the job you want to run, and
my_solid
is a solid within that job that requires a start and end time for pulling data. The
every_three_hours_schedule
schedule calculates the start and end times for each run based on the scheduled execution time and creates a
RunRequest
with the appropriate run configuration. For backfills at a daily granularity, you can create a separate schedule or manually trigger backfills using the Dagster UI or the CLI. When you trigger a backfill, you can specify the start and end times for each day you want to process. If you want to maintain the concept of daily partitions while still running the job every 3 hours, you can use the
DailyPartitionsDefinition
to define your partitions and then use a custom schedule to run the job at a different cadence, as shown in the previous example. This way, you can still refer to daily partitions for backfills and other operations that are based on the partitioning system. Please note that the code examples provided are conceptual and may require adjustments to fit your specific use case, such as the structure of your job and solids, as well as the configuration schema you have defined.
j
@Scout show me this example for an asset
s
Certainly! In Dagster, you can define an asset that computes data based on a time window, such as the previous 3 hours, and you can schedule the computation to run at a specific cadence, like every 3 hours. However, assets are typically associated with a partition set that defines the granularity of the data processing, and the scheduling of asset computations is usually aligned with these partitions. To achieve the behavior you described—running an asset computation every 3 hours that processes data from the previous 3 hours—you would define a schedule that triggers the asset computation with the appropriate time window. Here's an example of how you might set this up:
Copy code
python
from dagster import asset, ScheduleDefinition, RunRequest
from datetime import datetime, timedelta

# Define your asset with the logic to process data based on a time window
@asset
def my_timely_asset(context):
    # The context provides the start and end times for the asset computation
    start_time = context.op_config["start_time"]
    end_time = context.op_config["end_time"]
    
    # Your logic to pull data from the previous 3 hours until the current time
    data = pull_data(start_time=start_time, end_time=end_time)
    
    # Process and return the data
    return process_data(data)

# Define a schedule that runs every 3 hours
def every_three_hours_schedule():
    current_time = datetime.now()
    start_time = current_time - timedelta(hours=3)
    
    run_config = {
        "ops": {
            "my_timely_asset": {
                "config": {
                    "start_time": start_time.isoformat(),
                    "end_time": current_time.isoformat(),
                }
            }
        }
    }
    
    return RunRequest(run_key=current_time.isoformat(), run_config=run_config)

# Create a ScheduleDefinition for the asset
my_asset_schedule = ScheduleDefinition(
    job=my_timely_asset.to_job(),
    cron_schedule="0 */3 * * *",  # every 3 hours
    run_request_fn=every_three_hours_schedule
)
In this example,
my_timely_asset
is an asset that processes data based on a start and end time. The
every_three_hours_schedule
function calculates the start and end times for the current 3-hour window and creates a
RunRequest
with the appropriate run configuration. The
my_asset_schedule
is a
ScheduleDefinition
that triggers the asset computation every 3 hours according to the cron schedule. Please note that the
pull_data
and
process_data
functions are placeholders for your actual data retrieval and processing logic. You will need to replace them with your actual implementation. Keep in mind that this example assumes that you are using the asset's op config to pass in the start and end times. If your asset requires a different configuration structure,
you will need to adjust the
run_config
accordingly.
j
@Scout so, to confirm, is there no way to retrieve the previous run time of a job that runs on a schedule?
s
As of my knowledge cutoff date in early 2023, Dagster does not provide a built-in function to directly retrieve the previous run time of a job from within a schedule or asset computation. Schedules in Dagster are stateless, and each schedule tick is independent of the previous ticks. However, you can achieve similar functionality by querying the Dagster instance for the run history of a job. You can use the
instance.get_runs
method to retrieve past runs of a job and then determine the most recent run time from that list. Here's a conceptual example of how you might do this within a schedule:
Copy code
python from dagster import schedule, RunRequest from datetime import datetime  @schedule(cron_schedule="0 */3 * * *", job=my_job)  # every 3 hours def every_three_hours_schedule(context):     # Get the list of past runs for 'my_job'     runs = context.instance.get_runs(job_name='my_job')          # Find the most recent run time     if runs:         most_recent_run_time = max(run.start_time for run in runs)     else:         most_recent_run_time = None          # Calculate the start time for the current run based on the most recent run time     if most_recent_run_time:         start_time = most_recent_run_time     else:         # If there are no past runs, use a default start time         start_time = datetime.now() - timedelta(hours=3)          end_time = datetime.now()          run_config = {         # Your run config here     }          return RunRequest(run_key=end_time.isoformat(), run_config=run_config)
In this example, the
every_three_hours_schedule
function queries the Dagster instance for past runs of
my_job
and calculates the most recent run time. It then uses this information to determine the start time for the current run. Please note that this approach may have performance implications if the number of past runs is large, as it retrieves all runs for the job and then filters them in Python. Additionally, this code is conceptual and may require adjustments to fit your specific use case and the structure of your Dagster instance and job definitions. If you need to perform this kind of logic frequently or for multiple schedules, you may want to consider implementing a custom utility function or a more efficient querying mechanism that suits your needs.
j
@Scout can a schedule created with build_schedule_from_partitioned_job be scheduled for a specific time, such as 3am? I have set cron_schedule as part of this function but it still defaults to 00:00 AM.
s
When you use
build_schedule_from_partitioned_job
to create a schedule for a partitioned job, the schedule is meant to align with the partitioning of the job or assets. This means that the schedule will execute at the cadence specified by the time partitioning of the job or assets. If you want to run the job at a specific time that is different from the default partition boundaries (e.g., 3 AM instead of midnight), you will need to use the
cron_schedule
parameter to specify the desired cron schedule. However, if you find that the
cron_schedule
parameter is not being respected and the job still defaults to running at 00:00 AM, this might be unexpected behavior. Here's how you would typically set a custom time using `cron_schedule`:
Copy code
python from dagster import build_schedule_from_partitioned_job  # Assuming 'my_partitioned_job' is a job that targets time-window-partitioned assets my_custom_schedule = build_schedule_from_partitioned_job(     my_partitioned_job,     cron_schedule="0 3 * * *",  # At 3 AM every day )
If the
cron_schedule
parameter is not working as expected, there are a few things you can check: 1. Ensure that the
cron_schedule
string is correctly formatted and valid. 2. Verify that there are no other configurations or constraints that might be overriding the
cron_schedule
parameter. 3. Check the documentation and release notes for any known issues or changes related to the
build_schedule_from_partitioned_job
function and the
cron_schedule
parameter. If the issue persists and you are unable to schedule the job at the desired time using
build_schedule_from_partitioned_job
, you may consider using the more general
@schedule
decorator to create a custom schedule that runs at the specific time you want, independent of the partitioning:
Copy code
python from dagster import schedule, RunRequest  @schedule(cron_schedule="0 3 * * *", job=my_partitioned_job)  # At 3 AM every day def my_custom_schedule(context):     # Logic to determine the appropriate partition key for the run     partition_key = ...  # Determine the partition key based on the current time or other logic          return RunRequest(run_key=context.scheduled_execution_time.isoformat(), run_config={}, tags={"dagster/partition": partition_key})
In this example,
my_custom_schedule
is a custom schedule that runs at 3 AM every day, and you would need to implement the logic to determine the appropriate partition key for each run. If you continue to experience issues with scheduling, it may be helpful to reach out to the Dagster community or support channels for further assistance.
j
@Scout I have this code:
Copy code
shipments_schedule = build_schedule_from_partitioned_job(
    job=shipments_job, cron_schedule="0 3 * * *"
)
But the cron is still showing as 00:00 and not 3AM. What would cause this? The job has a daily partition.
s
If you are encountering an issue where the
cron_schedule
parameter in
build_schedule_from_partitioned_job
is not being respected and the schedule is still defaulting to 00:00 instead of the specified 3 AM, this could be due to a few reasons: 1. Bug in Dagster: There might be a bug in the version of Dagster you are using that is causing the
cron_schedule
parameter to be ignored when using
build_schedule_from_partitioned_job
. 2. Incorrect Usage: Ensure that the
cron_schedule
parameter is being used correctly and that the schedule is being updated in the Dagster instance after the change. Sometimes, changes in the code might not be reflected until the schedule is stopped and restarted, or until the Dagster instance is reloaded. 3. Caching Issues: There could be caching issues where the old schedule configuration is being used instead of the updated one. Try restarting the Dagster daemon or the service that is running the schedules. 4. Documentation Mismatch: The documentation might not reflect the actual behavior of the system, or there might be a version mismatch where the feature behaves differently in the version you are using. Given that the expected behavior is for the
cron_schedule
parameter to set the schedule time, and this is not happening, the next steps would be: - Check the version of Dagster you are using and see if there are any known issues related to
build_schedule_from_partitioned_job
and the
cron_schedule
parameter. - Look for any release notes or documentation updates that might indicate changes in how the
cron_schedule
parameter should be used. - If possible, update to the latest version of Dagster and see if the issue persists. - Reach out to the Dagster community or support channels with details of the issue, including the version of Dagster you are using and the exact behavior you are observing. If the issue is confirmed to be a bug or unintended behavior, it may need to be reported to the Dagster maintainers for further investigation and resolution.
j
@Scout I can bypass this by using the normal @schedule decorator. Is it possible to create a generic schedule that can be applied to multiple jobs, or does each schedule need to explicitly target a job?
s
In Dagster, each schedule is explicitly associated with a single job. The
@schedule
decorator is used to define a schedule for a specific job, and the schedule's logic is tailored to the needs of that job, including any run configuration, tags, or other parameters that need to be passed to the job at execution time. Here's an example of how you would define a schedule for a specific job:
Copy code
python from dagster import schedule  @schedule(cron_schedule="0 3 * * *", job=my_job)  # At 3 AM every day def my_job_schedule(context):     # Logic to create the run configuration for 'my_job'     run_config = {         # ...     }     return run_config
If you have multiple jobs that require the same scheduling logic, you would need to define a separate schedule for each job. However, you can abstract the common logic into a function and reuse that function across multiple schedule definitions to avoid duplication. Here's a conceptual example:
Copy code
python from dagster import schedule  def common_schedule_logic():     # Common logic to generate run configuration     run_config = {         # ...     }     return run_config  @schedule(cron_schedule="0 3 * * *", job=my_first_job) def my_first_job_schedule(context):     return common_schedule_logic()  @schedule(cron_schedule="0 3 * * *", job=my_second_job) def my_second_job_schedule(context):     return common_schedule_logic()
In this example,
common_schedule_logic
is a function that contains the logic common to both schedules. Each schedule calls this function to generate the run configuration. As of my knowledge cutoff date, there is no built-in feature in Dagster that allows a single schedule to trigger multiple jobs. Each schedule must be associated with one job. If you need to run multiple jobs with similar scheduling logic, you would define a schedule for each job and reuse any common logic as shown above. If you have a use case where multiple jobs need to be run in a specific sequence or with dependencies, you might consider using sensors or creating a composite job that encapsulates the sequence of jobs you want to run. However, this would be a different approach than using a generic schedule for multiple jobs.
j
@Scout I'm following an example from github. Why does this show a "cannot find partition with key" error?
Copy code
@schedule(cron_schedule="@daily", job=shipments_job)
def daily_refresh(context: ScheduleEvaluationContext):
    partition_date = context.scheduled_execution_time
    partition_key = partition_date.strftime(logistics_daily_partition.fmt)
    return RunRequest(partition_key=partition_key)
s
The error message "cannot find partition with key" typically indicates that the schedule is attempting to run a job for a partition that does not exist in the partition set defined for the job. This can happen if the partition key generated by the schedule does not match any of the partition keys in the partition set. In the code snippet you provided, the schedule
daily_refresh
is generating a partition key based on the
scheduled_execution_time
of the context, formatted according to the
fmt
attribute of
logistics_daily_partition
. Here are a few things to check that might cause the error: 1. Partition Set Definition: Ensure that the
logistics_daily_partition
is correctly defined and includes the partition key that the schedule is trying to use. The partition set should cover the date range that includes the
scheduled_execution_time
. 2. Partition Key Format: The
fmt
attribute of
logistics_daily_partition
must match the date format of the partition keys in the partition set. If there is a mismatch in the format, the generated partition key will not match any existing partition. 3. Timezone Issues: If the
scheduled_execution_time
is in a different timezone than the one used for defining the partitions, there might be a mismatch. Ensure that both the schedule and the partition set are using the same timezone. 4. Off-by-One Error: Depending on how the partitions are defined, there might be an off-by-one error where the partition for a given date is actually keyed by the previous date. This can happen if the partition represents the data for the day that has just completed. 5. Schedule Timing: If the schedule runs at a time when the partition for that day has not yet been created (e.g., just after midnight), it may attempt to access a partition that does not exist until the end of the day. To debug this issue, you can log or print the generated
partition_key
and compare it with the list of partition keys in your partition set to see if there is a match. You can also manually inspect the partition set definition to ensure that it includes the expected partitions. If you continue to face issues, you may want to provide more context or details about how
logistics_daily_partition
is defined and how the partitions are structured, as the problem likely lies in the relationship between the schedule's generated partition key and the existing partition keys.
j
@Scout here is the partition: logistics_daily_partition = DailyPartitionsDefinition(start_date="2024-02-15") What format does the partition key need to be in?
s
The
DailyPartitionsDefinition
in Dagster creates a set of daily partitions where each partition corresponds to a single day. By default, the partition key for each partition is the date in the format
%Y-%m-%d
, which corresponds to the "year-month-day" format. Given your definition:
Copy code
python logistics_daily_partition = DailyPartitionsDefinition(start_date="2024-02-15")
The partition keys generated by this definition will be strings representing dates starting from "2024-02-15" onwards, formatted as "YYYY-MM-DD". For example, the partition key for February 15, 2024, would be "2024-02-15". To match this format in your schedule, you should ensure that the
partition_key
is formatted as a string in the "YYYY-MM-DD" format. Here's how you can modify your schedule to generate the correct partition key:
Copy code
python from dagster import schedule, ScheduleEvaluationContext, RunRequest from datetime import datetime  @schedule(cron_schedule="@daily", job=shipments_job) def daily_refresh(context: ScheduleEvaluationContext):     partition_date = context.scheduled_execution_time     partition_key = partition_date.strftime("%Y-%m-%d")  # Format as "YYYY-MM-DD"     return RunRequest(partition_key=partition_key)
In this code,
strftime("%Y-%m-%d")
is used to format the
scheduled_execution_time
to match the partition key format expected by
DailyPartitionsDefinition
. This should resolve the "cannot find partition with key" error, assuming that the
scheduled_execution_time
falls within the range of dates covered by your partition set.
j
@Scout I have compared my partition key to the list of partitions in the set. It is still showing as "mismatched key". The full error is:
Copy code
dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key `2024-02-20`.
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 341, in get_external_schedule_execution
    return schedule_def.evaluate_tick(schedule_context)
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/dagster/_core/definitions/schedule_definition.py", line 905, in evaluate_tick
    resolved_request = run_request.with_resolved_tags_and_config(
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/dagster/_core/definitions/run_request.py", line 223, in with_resolved_tags_and_config
    _check_valid_partition_key_after_dynamic_partitions_requests(
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/dagster/_core/definitions/run_request.py", line 326, in _check_valid_partition_key_after_dynamic_partitions_requests
    partitions_def.validate_partition_key(
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/dagster/_core/definitions/partition.py", line 276, in validate_partition_key
    raise DagsterUnknownPartitionError(
s
The error message you're encountering,
DagsterUnknownPartitionError: Could not find a partition with key '2024-02-20'
, suggests that the partition key being used to request a run does not exist in the partition set defined for the job. This can happen for several reasons, even if you believe the partition key matches the expected format and should be valid. Here are some steps to troubleshoot this issue: 1. Check Partition Set Definition: Ensure that the
DailyPartitionsDefinition
for your job actually includes the date '2024-02-20'. If the start date is '2024-02-15', as in your example, and there are no end date or other constraints that would exclude '2024-02-20', this partition should exist. 2. Check the Schedule's Execution Time: Verify that the
scheduled_execution_time
provided to the schedule's context is the correct date and time that you expect. Timezone issues or unexpected execution times could cause the wrong partition key to be generated. 3. Review the Job Definition: Make sure that the job
shipments_job
is correctly associated with the partition set
logistics_daily_partition
. If the job is not linked to the correct partition set, it will not recognize the partition keys. 4. Check for Dynamic Partitioning: If your partition set is dynamic or if there's any custom logic that could affect the generation of partition keys, ensure that this logic is functioning as expected and that the partition '2024-02-20' is being generated correctly. 5. Instance and Storage Configuration: Ensure that your Dagster instance and storage configuration are set up correctly so that the partition information is accessible at runtime. Issues with instance configuration or storage backends can sometimes lead to partitions not being found. 6. Dagster Version: Check the version of Dagster you are using. If there have been any updates or bug fixes related to partitioning since your version was released, consider upgrading to the latest version. 7. Manual Validation: As a last resort, you can try to manually validate the partition key by using the Dagster GraphQL API, the Dagit UI, or the Dagster Python API to list partitions and check for the existence of the '2024-02-20' partition. If after these checks the issue still persists, it may be helpful to provide more detailed information about your job and partition set definitions, as well as the exact steps you're taking when encountering this error. This could include the job definition, how the partition set is linked to the job, and any relevant configuration details. With more context, it may be possible to identify the cause of the mismatched key error.
j
@Scout What data format does the partition_key expect in this context? Does it need to have timestamps etc as well?
s
The
partition_key
for a
DailyPartitionsDefinition
in Dagster expects a string representing the date in the default format of
%Y-%m-%d
, which corresponds to "year-month-day". This format aligns with the ISO 8601 date standard, which is commonly used for representing dates. For a
DailyPartitionsDefinition
, the partition key does not need to include a timestamp. It should only include the date portion. Here's an example of a valid partition key for a daily partition:
"2024-02-20"
. The documentation states that the default date format for partitions is
%Y-%m-%d
, and unless a different format is specified in the
fmt
parameter of the partition definition, this is the format that should be used. If you are using the default settings for
DailyPartitionsDefinition
, you do not need to include timestamps or any other information beyond the date. The partition key should be a string that matches the date of the partition you are trying to reference. Here's an example of how you might define a daily partition set and reference a partition key:
Copy code
python from dagster import DailyPartitionsDefinition  # Define the partition set starting from a specific date logistics_daily_partition = DailyPartitionsDefinition(start_date="2024-02-15")  # Example of a valid partition key for February 20, 2024 valid_partition_key = "2024-02-20"
When you create a
RunRequest
for a schedule or a sensor that targets this partition set, you should use a partition key that matches this format:
Copy code
python from dagster import RunRequest  # When creating a RunRequest, use the partition key in the "YYYY-MM-DD" format run_request = RunRequest(partition_key=valid_partition_key)
If you have specified a custom format using the
fmt
parameter, you should ensure that the partition key you generate matches that custom format. However, if you're using the default settings, the
"YYYY-MM-DD"
format is what you should use.
j
@Scout lets say that it's an off by one error. How should this be resolved based on the context.scheduled_execution_time?
s
If you suspect that there is an off-by-one error with the partition key generation, it could be due to the
scheduled_execution_time
not aligning with the actual partitions available in your partition set. This can happen if the schedule runs at the very beginning of a new day (e.g., just after midnight), but the partition for that day has not yet been created because the partition is meant to represent the data for the day that has just completed. To resolve an off-by-one error, you can adjust the
scheduled_execution_time
to account for the offset. If your partitions represent the data for the previous day, you would subtract one day from the
scheduled_execution_time
to get the correct partition key. Here's how you can adjust the schedule to account for the off-by-one error:
Copy code
python from dagster import schedule, ScheduleEvaluationContext, RunRequest from datetime import timedelta  @schedule(cron_schedule="@daily", job=shipments_job) def daily_refresh(context: ScheduleEvaluationContext):     # Adjust the scheduled_execution_time to account for the off-by-one error     partition_date = context.scheduled_execution_time - timedelta(days=1)     partition_key = partition_date.strftime("%Y-%m-%d")     return RunRequest(partition_key=partition_key)
In this example, we subtract one day from the
context.scheduled_execution_time
using
timedelta(days=1)
before formatting it to create the
partition_key
. This ensures that the partition key corresponds to the previous day's partition, which should exist in the partition set if the partitions represent completed days. This adjustment should resolve the off-by-one error and allow the schedule to correctly reference the existing partition for the previous day. Make sure to test this change to confirm that it resolves the issue and that the schedule now triggers runs for the correct partitions.
j
@Scout how can I configure a bi-hourly partition? I.e., every two hours
s
To configure a bi-hourly partition in Dagster, you would define a custom partition set that creates partitions for every two-hour window. As of my knowledge cutoff date, Dagster does not have a built-in
BiHourlyPartitionsDefinition
, but you can use
TimeWindowPartitionsDefinition
to create a custom partitioning scheme that aligns with a bi-hourly schedule. Here's an example of how you might define a bi-hourly partition set using
TimeWindowPartitionsDefinition
with a cron schedule that triggers every two hours:
Copy code
python from dagster import TimeWindowPartitionsDefinition  # Define a partition set that creates a new partition every two hours bi_hourly_partitions_def = TimeWindowPartitionsDefinition(     cron_schedule="0 */2 * * *",  # Every two hours     start=datetime.datetime(2024, 1, 1),  # Replace with your desired start date     fmt="%Y-%m-%d-%H-%M",  # The format for the partition keys )  # Example usage with an asset @asset(partitions_def=bi_hourly_partitions_def) def my_bi_hourly_asset():     # Asset computation logic here     ...
In this example, the
cron_schedule
parameter is set to
"0 */2 * * *"
which means "at minute 0 past every 2nd hour." The
start
parameter specifies the start date and time for the first partition, and the
fmt
parameter defines the format for the partition keys, which includes the date and hour. When you use this partition set with an asset or job, Dagster will create partitions according to this bi-hourly schedule. Each partition will represent a two-hour window starting from the specified start date and time. Please note that the
fmt
parameter should match the granularity of your partitions. Since we're creating bi-hourly partitions, including the hour in the format (
%H
) is important to differentiate between partitions within the same day. Remember to replace the
start
parameter with the actual start date and time that you want for your partitions. The
fmt
parameter can also be adjusted to match your preferred string representation for the partition keys.
206 Views