https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Timo Klockow

12/13/2022, 8:47 AM
Hey Guys, hey @sandy, I didn’t fully grasp the concept of scheduling yet. Is it possible (like back in Airflow) to set a
start_date
and
end_date
for this schedule? Mind, I ain’t looking for working with Assets here, I just need this schedule to start executing in the past (with past ticks) as part of a backfill logic/mechanism.
Copy code
from dagster import DefaultScheduleStatus
from dagster import job
from dagster import RunRequest
from dagster import schedule
from dagster import ScheduleEvaluationContext

@job
def test_job():
    op_1()
    op_2()

@schedule(
    name='test_pipeline',
    job=test_job,
    cron_schedule='0 3 * * *',
    default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_daily_pipeline(context: ScheduleEvaluationContext) -> RunRequest:
    execution_date: datetime = context.scheduled_execution_time

    start: datetime = execution_date - timedelta(days=timedelta_days)
    end: datetime = execution_date - timedelta(hours=1)

    config = {
        'config': {
            'start_dt': start.strftime('%Y-%m-%d'),
            'end_dt': end.strftime('%Y-%m-%d'),
        }
    }

    return RunRequest(
        run_key=None,
        run_config={'ops': {op_name: config for op_name in [op_1, op_2]}}
    )

@repository
def repo():
    return [
        scheduled_daily_pipeline
    ]
dagster bot responded by community 1
I am also fine knowing how to solve this with asset materialization. The ops essentially do API calls to Databricks, to start jobs with
start_dt
and
end_dt
as arguments.
d

Daniel Galea

12/13/2022, 10:32 AM
Hey Timo, maybe I can help! I am creating my (Partitioned) Jobs from Graphs, and then creating my Schedules from Partitioned Jobs. I haven't been able to find a way to set an end date though, but I think you are trying to get the start and end of an execution, right? This is how I do it: Job and Schedule (which I later pass on to the repository):
from dagster import (
DailyPartitionsDefinition,
build_schedule_from_partitioned_job,
DefaultScheduleStatus,
OpExecutionContext
)
from my_graphs import my_graph_pipeline
job = <http://my_graph_pipeline.to|my_graph_pipeline.to>_job(
name="job name",
description="job description",
partitions_def=DailyPartitionsDefinition(
start_date="2022-12-12", timezone="Europe/Amsterdam"
)
)
job_schedule = build_schedule_from_partitioned_job(
job, default_status=DefaultScheduleStatus.RUNNING
)
I want a job that is partitioned daily, in your case you need to also set
hour_offset=3
inside
DailyPartitionDefinition
so that it runs at 3am every day. Later, when I want to access the start and end of a partitioned run (or backfill), I can use the OpExecutionContext to access them:
@op
def my_op(context: OpExecutionContext):
start = context.partition_time_window.start
end = context.partition_time_window.end
....
Is this what you wanted to know?
👍 1
t

Timo Klockow

12/14/2022, 8:27 AM
That did help alot! Thank you @Daniel Galea
🌈 1
d

Daniel Galea

12/14/2022, 9:17 AM
You're welcome 🙂
2 Views