I got `dagster.core.errors.DagsterInvariantViolati...
# announcements
I got
dagster.core.errors.DagsterInvariantViolationError: Compute function for solid
when I tried
. It works fine without
Copy code
def read_db(_context, query, engine):
    df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
    return df

def savedb():
    connect_db = connectdb.alias("connect_db")
    read_sql_file = read_sqlfile.alias("read_sql_file")
    # <http://context.log.info|context.log.info>("test done")
    data = read_db(read_sql_file(), connect_db())
    return data
do you have a fuller stack trace?
Can we also see the full pipeline or at least what savedb is connected to? read_sql with chunk size actually returns an iterator rather than a dataframe which messes things up downstream.
Thanks for the help. Here is the whole pipeline.
Copy code
import csv
import pandas as pd
from sqlalchemy import create_engine

import os
from load_save_dagster import save_obj as save_pkl
from save_load_chunk_dagster import save_chunk
from save_load_chunk_dagster import load_chunk
from dagster import (

def connectdb(context, crd_path):
    with open(crd_path, "r") as f:
        lines = [row for row in csv.DictReader(f)]
    alist = list(lines[0].values())
    usr = alist[0]
    pwd = alist[1]
    engine = create_engine(
        "oracle://{usr}:{pwd}@mydatabase".format(usr=usr, pwd=pwd),

    return engine

def read_sqlfile(context, sqlfile):
    sqlfile = os.path.join(*[os.getcwd(), "sql_scripts", sqlfile])
    with open(sqlfile, "r") as f:
        query = f.read()
    <http://context.log.info|context.log.info>("sql file path is {path}.".format(path=sqlfile))
    <http://context.log.info|context.log.info>("current dir is {path}".format(path=os.getcwd()))
    <http://context.log.info|context.log.info>("sql script ....\n {file}".format(file=query))
    return query

def read_db(_context, query, engine):
    df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
    return df

def savedb():
    connect_db = connectdb.alias("connect_db")
    read_sql_file = read_sqlfile.alias("read_sql_file")
    # <http://context.log.info|context.log.info>("test done")
    data = read_db(read_sql_file(), connect_db())
    return data

def write_db(context, data):
    save_chunk(data, "hstay_v1")

def main():

if __name__ == "__main__":
    environment_dict = {
        "solids": {
            "savedb": {
                "solids": {
                    "connect_db": {
                        "inputs": {"crd_path": {"value": "credentials.json"}}
                    "read_sql_file": {
                        "inputs": {"sqlfile": {"value": "sql_v3.sql"}}

    result = execute_pipeline(main, environment_dict=environment_dict)
Here is the
. It is just
for all chunks.
Copy code
def save_obj(obj, name, name_dir="data"):
    Save to pickle.

    obj : any object
        This can be a dictionary or ndarray.
    name : str
        The name for the object to be saved.
    name_dir : str, default 'data'
        Name of the directory.

    No return.
        Save the pickle object to the local file system.
    dir_above = os.getcwd()
    data_dir_path = os.path.join(dir_above, name_dir)

    if not os.path.isdir(data_dir_path):

    data_path = os.path.join(data_dir_path, name + ".pkl")

    with open(data_path, "wb") as f:
        joblib.dump(obj, f)

def save_chunk(reader, name):
    for i, chunk in enumerate(reader):
        save_obj(chunk, "{name}_{Idx}".format(name=name, Idx=i))
And what’s the full stack trace?
I am not sure this is the full stack trace. Please let me know if it is not.
Copy code
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - PIPELINE_START - Started execution of pipeline "main".
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - ENGINE_EVENT - Executing steps in process (pid: 4676)
 event_specific_data = {"error": null, "metadata_entries": [["pid", null, ["4676"]], ["step_keys", null, ["['savedb.connect_db.compute', 'savedb.read_sql_file.compute', 'savedb.read_db.compute', 'write_db.compute']"]]]}
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.connect_db.compute".
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "crd_path" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "crd_path", "type_check_data": [true, "crd_path", null, []]}
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["savedb.connect_db.compute", "result"], "type_check_data": [true, "result", null, []]}
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SUCCESS - Finished execution of step "savedb.connect_db.compute" in 38ms.
 event_specific_data = {"duration_ms": 38.864100000000064}
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.read_sql_file.compute".
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "sqlfile" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "sqlfile", "type_check_data": [true, "sqlfile", null, []]}
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - sql file path is D:\Projects\LHLOS\sql_scripts\test.sql.
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - current dir is D:\Projects\LHLOS
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["savedb.read_sql_file.compute", "result"], "type_check_data": [true, "result", null, []]}
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SUCCESS - Finished execution of step "savedb.read_sql_file.compute" in 56ms.
 event_specific_data = {"duration_ms": 56.736900000000205}
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.read_db.compute".
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "query" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "query", "type_check_data": [true, "query", null, []]}
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "engine" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "engine", "type_check_data": [true, "engine", null, []]}
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - ERROR - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_FAILURE - Execution of step "savedb.read_db.compute" failed.
            cls_name = "DagsterInvariantViolationError"
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"

  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 274, in dagster_event_sequence_for_step
    for step_event in check.generator(_core_dagster_event_sequence_for_step(step_context)):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 552, in _core_dagster_event_sequence_for_step
    _step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 322, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 680, in _user_event_sequence_for_step_compute_fn
    for event in gen:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 87, in _execute_core_compute
    for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 72, in _yield_compute_results
    ).format(event=repr(event), solid_name=str(step.solid_handle))

2020-02-25 10:51:55 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - Dependencies for step write_db.compute failed: ['savedb.read_db.compute']. Not executing.
               solid = "write_db"
    solid_definition = "write_db"
            step_key = "write_db.compute"
2020-02-25 10:51:55 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SKIPPED - Skipped execution of step "write_db.compute".
               solid = "write_db"
    solid_definition = "write_db"
            step_key = "write_db.compute"
2020-02-25 10:51:55 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - ENGINE_EVENT - Finished steps in process (pid: 4676) in 1.89s
 event_specific_data = {"error": null, "metadata_entries": [["pid", null, ["4676"]], ["step_keys", null, ["['savedb.connect_db.compute', 'savedb.read_sql_file.compute', 'savedb.read_db.compute', 'write_db.compute']"]]]}
2020-02-25 10:51:55 - dagster - ERROR - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - PIPELINE_FAILURE - Execution of pipeline "main" failed.
The error on the dagit is :
Copy code
dagster.core.errors.DagsterInvariantViolationError: Compute function for solid savedb.read_db yielded hospital ...... [9 rows x 52 columns] rather than an instance of the Output or Materialization class.
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 274, in dagster_event_sequence_for_step
    for step_event in check.generator(_core_dagster_event_sequence_for_step(step_context)):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 552, in _core_dagster_event_sequence_for_step
    _step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 322, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 680, in _user_event_sequence_for_step_compute_fn
    for event in gen:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 87, in _execute_core_compute
    for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 72, in _yield_compute_results
    ).format(event=repr(event), solid_name=str(step.solid_handle))
in your solid
you have
return df
which as Abhi mentioned above will be an iterator instead of a dataframe when you supply the
i think we could improve that error message
Thanks. Now I see the problem. So it works when I write the dataframe in
instead of passing it to another one.
Ya you could return a list of dataframe “chunks” by doing
return list(df)
but you would need to update the other solids accordingly
just curious, could you have the solid yield each chunk (DataFrame) and each one would be sent downstream individually ? or does that go back to the no mapping yet thing
A solid needs to return a set of finite outputs which can be mappable to the inputs of other solids. There maybe a map-reduce-esque solution but that is not supported yet if that is what you mean by "no mapping yet".
if you don't mind being a guinea pig, would this error message have been more useful?
Copy code
dagster.core.errors.DagsterInvariantViolationError: Compute function for solid return_iterator yielded value '1' of type <class 'int'> rather than an instance of Output, Materialization, or ExpectationResult. Values yielded by solids must be wrapped in one of these types. If your solid has a single output and yields no other events, you may want to use `return` instead of `yield` in the body of your solid compute function. If you are already using `return`, you may be inadvertantly returning a generator rather than the value you expect.
or still hard to understand
@Eric if you knew how many chunks you were yielding, you could certainly have a fixed-width graph that processed multiple outputs and fanned them in later
@Thein i'm also curious if it was difficult to identify the error message
inside the user interface
@max When I saw
in dagit web console, I knew it is due to the iterator but I was also thinking this is the first time I am using dagster so I might be doing something wrong. 🙂