Ben Smith
03/31/2020, 11:19 PMError 1: Missing required field "solids" at document config root. Available Fields: "['execution', 'loggers', 'resources', 'solids', 'storage']".
Still on serial_pipeline.py, Windows 10, Anaconda, pip installed
@pipeline
def serial_pipeline():
sort_by_calories(load_cereals())
def test_serial_pipeline():
res = execute_pipeline(serial_pipeline, environment_dict=ed)
assert res.success
if __name__ == '__main__':
result = execute_pipeline(serial_pipeline)
assert result.success
alex
03/31/2020, 11:34 PMsort_by_calories
and load_cereals
are you using?
The error indicates that some piece of information is missing that is required for execution - such as load_cereals
needing to have an input definedBen Smith
04/01/2020, 12:49 AMed = {
'solids': {
'load_cereals': {
'inputs': {
'csv_path': {'value': 'C:/Users/bsmith/OneDrive - Catalina Marketing/Python/catmktg/scratch/cereal.csv'}
}
}
}
}
@solid(
input_defs=[InputDefinition(name='csv_path',str)]
)
def load_cereals(context):
dataset_path = context.solid_config['csv_path']
<http://context.log.info|context.log.info>("Reading from {csv_path}".format(csv_path=dataset_path))
with open(dataset_path, 'r') as fd:
cereals = [row for row in csv.DictReader(fd)]
<http://context.log.info|context.log.info>(
'Found {n_cereals} cereals'.format(n_cereals=len(cereals))
)
return cereals
@solid
def sort_by_calories(context, cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal['calories'])
)
<http://context.log.info|context.log.info>(
'Least caloric cereal: {least_caloric}'.format(
least_caloric=sorted_cereals[0]['name']
)
)
<http://context.log.info|context.log.info>(
'Most caloric cereal: {most_caloric}'.format(
most_caloric=sorted_cereals[-1]['name']
)
)
import csv
from dagster import execute_pipeline, pipeline, solid, Field, InputDefinition
ed = {
'solids': {
'load_cereals': {
'inputs': {
'csv_path': {'value': 'C:/Users/bsmith/OneDrive - Catalina Marketing/Python/catmktg/scratch/cereal.csv'}
}
}
}
}
@solid(
input_defs=[InputDefinition(name='csv_path')]
)
def load_cereals(context, csv_path):
dataset_path = context.solid_config['csv_path']
<http://context.log.info|context.log.info>("Reading from {csv_path}".format(csv_path=dataset_path))
with open(dataset_path, 'r') as fd:
cereals = [row for row in csv.DictReader(fd)]
<http://context.log.info|context.log.info>(
'Found {n_cereals} cereals'.format(n_cereals=len(cereals))
)
return cereals
@solid
def sort_by_calories(context, cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal['calories'])
)
<http://context.log.info|context.log.info>(
'Least caloric cereal: {least_caloric}'.format(
least_caloric=sorted_cereals[0]['name']
)
)
<http://context.log.info|context.log.info>(
'Most caloric cereal: {most_caloric}'.format(
most_caloric=sorted_cereals[-1]['name']
)
)
@pipeline
def serial_pipeline():
sort_by_calories(load_cereals())
def test_serial_pipeline():
res = execute_pipeline(serial_pipeline, environment_dict=ed)
assert res.success
if __name__ == '__main__':
result = execute_pipeline(serial_pipeline)
assert result.success
alex
04/01/2020, 3:37 PMI was (and continue to) mix up the concepts of “input” and “config”ya the difference is subtle if not arbitrary at times so this is very valid criticsm. Thanks for posting a follow up