Is it possible to get the scheduled_execution_tim...
# ask-community
d
Is it possible to get the scheduled_execution_time without having to define a schedule in this way? I've only seen this type of example:
@schedule(
job=custom_job
)
def configurable_job_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
tags={"date": scheduled_date}
)
This requires you to have a pre-defined job by using the
@job
annotation which I can't use because I need to pass a custom configuration to my job which cannot be done in the annotation. I am defining my jobs through graphs and my schedule via a job in the following way:
from <http://graphs.my|graphs.my>_graph import my_pipeline
def my_job(configs: dict):
logger = get_dagster_logger()
custom_config = {
"ops": {
"launch_emr_cluster": {"config": configs["emr_configs"]},
}
}
job = <http://emr_pipeline.to|emr_pipeline.to>_job(
name=configs["job_name"],
description=configs["job_description"],
partitions_def=DailyPartitionsDefinition(
start_date="2022-11-09", timezone="Europe/Amsterdam"
),
tags={
"dagster-k8s/config": {
"job_spec_config": {"ttl_seconds_after_finished": 300}
}
},
config=custom_config,
resource_defs={
"emr_job_runner": emr_job_runner
},
)
job_schedule = build_schedule_from_partitioned_job(
job,
default_status=DefaultScheduleStatus.RUNNING,
)
return job, job_schedule
The
config
parameter is loaded from a JSON file and looks like the snippet below. The reason I do this is that I have a list of dictionaries which I use to dynamically create jobs.
{
"emr_release_label": "emr-6.8.0",
"cluster_name": "Dagster Cluster",
"master_node_instance_type": "m5.xlarge",
"worker_node_instance_type": "m5.xlarge",
"worker_node_instance_count": 1,
"ec2_subnet_id": "",
"worker_node_spot_bid_price": "",
"job_name": "JOB",
"job_description": "JOB DESCRIPTION"
}
I cannot pass this config parameter to a job which is declared in the following way:
@job(
name="daily_etl",
description="Daily ETL job",
partitions_def=DailyPartitionsDefinition(
start_date="2022-11-09", timezone="Europe/Amsterdam"
),
tags={
"dagster-k8s/config": {
"job_spec_config": {"ttl_seconds_after_finished": 300}
}
},
config=custom_config, # I DON'T WANT TO HARD CODE THIS CONFIG
resource_defs={
"emr_job_runner": emr_job_runner
},
)
def my_job():
step2(step1())
o
hi @Daniel Galea! might be missing something, but any reason you can't do
Copy code
@schedule(name=f"{job.name}_schedule", job=job)
def job_schedule(context):
     ...

return job, job_schedule
inside of your
my_job
function?
d
@owen Yes that's something that I did eventually but I was wondering if there was a way to do it without having one function inside another, but in the end it works 🙂
120 Views