Noah K
11/11/2020, 9:07 AMEric Bellet
11/11/2020, 9:08 AMNoah K
11/11/2020, 9:09 AMEric Bellet
11/11/2020, 9:10 AMdagster.core.errors.DagsterInvariantViolationError: Attempted to iterate over an InvokedSolidOutputHandle. This object represents the output "result" from the solid "setDatesToProcess". Consider yielding multiple Outputs if you seek to pass different parts of this output to different solids.
from dagster import (
DagsterType,
InputDefinition,
OutputDefinition,
String,
execute_pipeline,
pipeline,
solid,
)
from datetime import datetime, timedelta
@solid
def setDatesToProcess(context, startDt, endDt):
startDt = datetime.strptime(startDt, "%Y-%m-%d")
endDt = datetime.strptime(endDt, "%Y-%m-%d")
dates = [startDt + timedelta(days=x) for x in range((endDt - startDt).days + 1)]
return dates
@solid
def obtainDataFromKafka(context, date):
<http://context.log.info|context.log.info>(f"Extracting data from {date}")
return ["Example", "Data"]
@solid
def saveFileS3(context, data, date):
path = f"s3:mybucket/myfolder/{date}"
<http://context.log.info|context.log.info>(f"Saving {data} in the path {path}")
return path
@solid
def copys3FileToTableRedshift(context, path):
<http://context.log.info|context.log.info>(f"Copying to Redshift using S3 file {path}")
@pipeline
def my_pipeline():
dates = setDatesToProcess()
for date in dates:
copys3FileToTableRedshift(saveFileS3(obtainDataFromKafka(date)))
run_config = {
"solids": {
"setDatesToProcess": {
"inputs": {
"startDt": {"value": "2020-10-05"},
"endDt": {"value": "2020-10-10"}
}
}
}
}
if __name__ == "__main__":
execute_pipeline(my_pipeline, run_config=run_config)
Noah K
11/11/2020, 9:23 AMEric Bellet
11/11/2020, 9:24 AMNoah K
11/11/2020, 9:24 AMEric Bellet
11/11/2020, 9:24 AMNoah K
11/11/2020, 9:25 AMEric Bellet
11/11/2020, 9:30 AMsashank
11/11/2020, 4:18 PMcat
11/11/2020, 4:40 PMEric Bellet
11/11/2020, 4:41 PM