Julius
03/20/2023, 6:43 PM{{ds}}
will place the date that DAG is executed. For example
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
incremental_load_date = "{{ ds }}"
dbt_run_command = f"dbt run --select lake_OPTOMATE_2000_060_PATIENT --vars 'incremental_load_date: {incremental_load_date}'"
with DAG(
dag_id='load_lake_2000_060_bash_operator',
# default_args=default_args,
schedule_interval='@daily',
start_date=pendulum.datetime(2022, 11, 29, tz="Australia/Sydney"),
end_date=pendulum.datetime(2022, 12, 10, tz="Australia/Sydney"),
max_active_runs=1
) as dag:
execute = BashOperator(
task_id='load_lake_2000_060',
bash_command=f'cd /usr/local/airflow/include/dbtvault && {dbt_run_command}',
)
The incremental_load_date
will replace by the date that DAG is executed. And when DAG is running from start_date
to end_date
, the vars incremental_load_date
will replace by that date. Does Dagster have function like start_date
and end_date
, and can that date replace the vars like {{ds}}
.jamie
03/20/2023, 7:20 PMJoe
03/20/2023, 7:24 PM@schedule
pass in configuration to the Job run that sets things like execution date
something like
@op(config_schema={"scheduled_date": str})
def configurable_op(context):
<http://context.log.info|context.log.info>(context.op_config["scheduled_date"])
@job
def configurable_job():
configurable_op()
@schedule(job=configurable_job, cron_schedule="0 0 * * *")
def configurable_job_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
run_key=None,
run_config={
"ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}}
},
tags={"date": scheduled_date},
)
https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#schedules-that-provide-custom-run-config-and-tagsJoe
03/20/2023, 7:25 PM