Timo Klockow
12/13/2022, 8:47 AMstart_date
and end_date
for this schedule?
Mind, I ain’t looking for working with Assets here, I just need this schedule to start executing in the past (with past ticks) as part of a backfill logic/mechanism.
from dagster import DefaultScheduleStatus
from dagster import job
from dagster import RunRequest
from dagster import schedule
from dagster import ScheduleEvaluationContext
@job
def test_job():
op_1()
op_2()
@schedule(
name='test_pipeline',
job=test_job,
cron_schedule='0 3 * * *',
default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_daily_pipeline(context: ScheduleEvaluationContext) -> RunRequest:
execution_date: datetime = context.scheduled_execution_time
start: datetime = execution_date - timedelta(days=timedelta_days)
end: datetime = execution_date - timedelta(hours=1)
config = {
'config': {
'start_dt': start.strftime('%Y-%m-%d'),
'end_dt': end.strftime('%Y-%m-%d'),
}
}
return RunRequest(
run_key=None,
run_config={'ops': {op_name: config for op_name in [op_1, op_2]}}
)
@repository
def repo():
return [
scheduled_daily_pipeline
]
Timo Klockow
12/13/2022, 9:56 AMstart_dt
and end_dt
as arguments.Daniel Galea
12/13/2022, 10:32 AMfrom dagster import (
DailyPartitionsDefinition,
build_schedule_from_partitioned_job,
DefaultScheduleStatus,
OpExecutionContext
)
from my_graphs import my_graph_pipeline
job = <http://my_graph_pipeline.to|my_graph_pipeline.to>_job(
name="job name",
description="job description",
partitions_def=DailyPartitionsDefinition(
start_date="2022-12-12", timezone="Europe/Amsterdam"
)
)
job_schedule = build_schedule_from_partitioned_job(
job, default_status=DefaultScheduleStatus.RUNNING
)
I want a job that is partitioned daily, in your case you need to also set hour_offset=3
inside DailyPartitionDefinition
so that it runs at 3am every day.
Later, when I want to access the start and end of a partitioned run (or backfill), I can use the OpExecutionContext to access them:
@op
def my_op(context: OpExecutionContext):
start = context.partition_time_window.start
end = context.partition_time_window.end
....
Is this what you wanted to know?Timo Klockow
12/14/2022, 8:27 AMDaniel Galea
12/14/2022, 9:17 AM