Hamza Khurshid Butt
02/23/2021, 1:17 PMimport csv
from datetime import datetime, time
from dagster import daily_schedule, pipeline, repository, solid
from dagster.utils import file_relative_path
@solid
def hello_cereal(context, date):
dataset_path = file_relative_path(__file__, "cereal.csv")
<http://context.log.info|context.log.info>(dataset_path)
with open(dataset_path, "r") as fd:
cereals = [row for row in csv.DictReader(fd)]
<http://context.log.info|context.log.info>(
"Today is {date}. Found {n_cereals} cereals".format(
date=date, n_cereals=len(cereals)
)
)
@daily_schedule(
pipeline_name="hello_cereal_pipeline",
start_date=datetime(2021, 2, 24),
execution_time=time(6, 45)
)
def good_morning_schedule(date):
return {
"solids": {
"hello_cereal": {
"inputs": {"date": {"value": date.strftime("%Y-%m-%d")}}
}
}
}
@pipeline
def hello_cereal_pipeline():
hello_cereal()
@repository
def hello_cereal_repository():
return [hello_cereal_pipeline, good_morning_schedule]
daniel
02/23/2021, 1:26 PMHamza Khurshid Butt
02/23/2021, 1:51 PMdaniel
02/23/2021, 2:31 PMHamza Khurshid Butt
02/24/2021, 7:09 AM