https://dagster.io/ logo
b

Ben Smith

03/31/2020, 11:19 PM
Hello - believe I've read all the documentation and searched all of the threads resembling this issue. Getting an error I can't fix ...in the toy problem 😕 Any advice on
Error 1: Missing required field "solids" at document config root. Available Fields: "['execution', 'loggers', 'resources', 'solids', 'storage']".
Still on serial_pipeline.py, Windows 10, Anaconda, pip installed
@pipeline
def serial_pipeline():
    
sort_by_calories(load_cereals())
def test_serial_pipeline():
    
res = execute_pipeline(serial_pipeline, environment_dict=ed)
    
assert res.success
if __name__ == '__main__':
    
result = execute_pipeline(serial_pipeline)
    
assert result.success
a

alex

03/31/2020, 11:34 PM
what
sort_by_calories
and
load_cereals
are you using? The error indicates that some piece of information is missing that is required for execution - such as
load_cereals
needing to have an input defined
b

Ben Smith

04/01/2020, 12:49 AM
ed = {
    
'solids': {
        
'load_cereals': {
            
'inputs': {
                
'csv_path': {'value': 'C:/Users/bsmith/OneDrive - Catalina Marketing/Python/catmktg/scratch/cereal.csv'}
            
}
        
}
    
}
}
@solid(
    
input_defs=[InputDefinition(name='csv_path',str)]
)
def load_cereals(context):
    
dataset_path = context.solid_config['csv_path']
<http://context.log.info|context.log.info>("Reading from {csv_path}".format(csv_path=dataset_path))
    
with open(dataset_path, 'r') as fd:
        
cereals = [row for row in csv.DictReader(fd)]
<http://context.log.info|context.log.info>(
        
'Found {n_cereals} cereals'.format(n_cereals=len(cereals))
    
)
    
return cereals
@solid
def sort_by_calories(context, cereals):
    
sorted_cereals = list(
        
sorted(cereals, key=lambda cereal: cereal['calories'])
    
)
<http://context.log.info|context.log.info>(
        
'Least caloric cereal: {least_caloric}'.format(
            
least_caloric=sorted_cereals[0]['name']
        
)
    
)
<http://context.log.info|context.log.info>(
        
'Most caloric cereal: {most_caloric}'.format(
            
most_caloric=sorted_cereals[-1]['name']
        
)
    
)
That might have sorted it - adding "InputDefinition" seems to have changed the error. Thanks much, Alex. Really getting confused about the difference between "Inputs" and "Config", when I should put my inputs in config and when I should config my inputs. Appreciate you helping me wrap my noodle around it.
No, it was removing the 'csv_path' argument to load_cereals that changed the error. Full code, still getting 'missing required field "solids" at document config root':
import csv
from dagster import execute_pipeline, pipeline, solid, Field, InputDefinition
ed = {
    
'solids': {
        
'load_cereals': {
            
'inputs': {
                
'csv_path': {'value': 'C:/Users/bsmith/OneDrive - Catalina Marketing/Python/catmktg/scratch/cereal.csv'}
            
}
        
}
    
}
}
@solid(
    
input_defs=[InputDefinition(name='csv_path')]
)
def load_cereals(context, csv_path):
    
dataset_path = context.solid_config['csv_path']
<http://context.log.info|context.log.info>("Reading from {csv_path}".format(csv_path=dataset_path))
    
with open(dataset_path, 'r') as fd:
        
cereals = [row for row in csv.DictReader(fd)]
<http://context.log.info|context.log.info>(
        
'Found {n_cereals} cereals'.format(n_cereals=len(cereals))
    
)
    
return cereals
@solid
def sort_by_calories(context, cereals):
    
sorted_cereals = list(
        
sorted(cereals, key=lambda cereal: cereal['calories'])
    
)
<http://context.log.info|context.log.info>(
        
'Least caloric cereal: {least_caloric}'.format(
            
least_caloric=sorted_cereals[0]['name']
        
)
    
)
<http://context.log.info|context.log.info>(
        
'Most caloric cereal: {most_caloric}'.format(
            
most_caloric=sorted_cereals[-1]['name']
        
)
    
)
@pipeline
def serial_pipeline():
    
sort_by_calories(load_cereals())
def test_serial_pipeline():
    
res = execute_pipeline(serial_pipeline, environment_dict=ed)
    
assert res.success
if __name__ == '__main__':
    
result = execute_pipeline(serial_pipeline)
    
assert result.success
Following up for posterity - I was (and continue to) mix up the concepts of "input" and "config". In the code above I declared "inputs" in the environment dictionary but used the "config" variable in the body of the function (the "dataset_path" variable in the "load_cereals" solid was set to "...solid_config['csv_path']"). Once I aligned to use inputs everywhere - in the environment_dict "ed", in the @solid(...) decorator, and in the body of the function, the pipeline was able to process.
a

alex

04/01/2020, 3:37 PM
I was (and continue to) mix up the concepts of “input” and “config”
ya the difference is subtle if not arbitrary at times so this is very valid criticsm. Thanks for posting a follow up