https://dagster.io/ logo
#announcements
Title
# announcements
t

Thein

02/25/2020, 3:54 AM
I got
dagster.core.errors.DagsterInvariantViolationError: Compute function for solid
when I tried
pd.read_sql
with
chunksize
. It works fine without
chunksize
.
Copy code
@solid
def read_db(_context, query, engine):
    df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
    return df


@composite_solid
def savedb():
    connect_db = connectdb.alias("connect_db")
    read_sql_file = read_sqlfile.alias("read_sql_file")
    # <http://context.log.info|context.log.info>("test done")
    data = read_db(read_sql_file(), connect_db())
    return data
m

max

02/25/2020, 4:46 AM
do you have a fuller stack trace?
a

abhi

02/25/2020, 8:04 AM
Can we also see the full pipeline or at least what savedb is connected to? read_sql with chunk size actually returns an iterator rather than a dataframe which messes things up downstream.
t

Thein

02/25/2020, 2:00 PM
Thanks for the help. Here is the whole pipeline.
Copy code
import csv
import pandas as pd
from sqlalchemy import create_engine

import os
from load_save_dagster import save_obj as save_pkl
from save_load_chunk_dagster import save_chunk
from save_load_chunk_dagster import load_chunk
from dagster import (
    composite_solid,
    execute_pipeline,
    execute_solid,
    pipeline,
    solid,
)


@solid
def connectdb(context, crd_path):
    with open(crd_path, "r") as f:
        lines = [row for row in csv.DictReader(f)]
    alist = list(lines[0].values())
    usr = alist[0]
    pwd = alist[1]
    engine = create_engine(
        "oracle://{usr}:{pwd}@mydatabase".format(usr=usr, pwd=pwd),
        pool_pre_ping=True,
    )

    return engine


@solid
def read_sqlfile(context, sqlfile):
    sqlfile = os.path.join(*[os.getcwd(), "sql_scripts", sqlfile])
    with open(sqlfile, "r") as f:
        query = f.read()
    <http://context.log.info|context.log.info>("sql file path is {path}.".format(path=sqlfile))
    <http://context.log.info|context.log.info>("current dir is {path}".format(path=os.getcwd()))
    <http://context.log.info|context.log.info>("sql script ....\n {file}".format(file=query))
    return query


@solid
def read_db(_context, query, engine):
    df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
    return df


@composite_solid
def savedb():
    connect_db = connectdb.alias("connect_db")
    read_sql_file = read_sqlfile.alias("read_sql_file")
    # <http://context.log.info|context.log.info>("test done")
    data = read_db(read_sql_file(), connect_db())
    return data


@solid
def write_db(context, data):
    save_chunk(data, "hstay_v1")


@pipeline
def main():
    write_db(savedb())


if __name__ == "__main__":
    environment_dict = {
        "solids": {
            "savedb": {
                "solids": {
                    "connect_db": {
                        "inputs": {"crd_path": {"value": "credentials.json"}}
                    },
                    "read_sql_file": {
                        "inputs": {"sqlfile": {"value": "sql_v3.sql"}}
                    },
                }
            }
        }
    }

    result = execute_pipeline(main, environment_dict=environment_dict)
Here is the
save_chunk.py
. It is just
joblib.dump
for all chunks.
Copy code
def save_obj(obj, name, name_dir="data"):
    """
    Save to pickle.

    Parameters
    ----------
    obj : any object
        This can be a dictionary or ndarray.
    name : str
        The name for the object to be saved.
    name_dir : str, default 'data'
        Name of the directory.

    Returns
    -------
    No return.
        Save the pickle object to the local file system.
    """
    dir_above = os.getcwd()
    data_dir_path = os.path.join(dir_above, name_dir)

    if not os.path.isdir(data_dir_path):
        os.makedirs(data_dir_path)

    data_path = os.path.join(data_dir_path, name + ".pkl")

    with open(data_path, "wb") as f:
        joblib.dump(obj, f)


def save_chunk(reader, name):
    for i, chunk in enumerate(reader):
        save_obj(chunk, "{name}_{Idx}".format(name=name, Idx=i))
a

abhi

02/25/2020, 2:20 PM
And what’s the full stack trace?
t

Thein

02/25/2020, 3:55 PM
I am not sure this is the full stack trace. Please let me know if it is not.
Copy code
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - PIPELINE_START - Started execution of pipeline "main".
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - ENGINE_EVENT - Executing steps in process (pid: 4676)
 event_specific_data = {"error": null, "metadata_entries": [["pid", null, ["4676"]], ["step_keys", null, ["['savedb.connect_db.compute', 'savedb.read_sql_file.compute', 'savedb.read_db.compute', 'write_db.compute']"]]]}
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.connect_db.compute".
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "crd_path" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "crd_path", "type_check_data": [true, "crd_path", null, []]}
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["savedb.connect_db.compute", "result"], "type_check_data": [true, "result", null, []]}
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SUCCESS - Finished execution of step "savedb.connect_db.compute" in 38ms.
 event_specific_data = {"duration_ms": 38.864100000000064}
               solid = "connect_db"
    solid_definition = "connectdb"
            step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.read_sql_file.compute".
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "sqlfile" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "sqlfile", "type_check_data": [true, "sqlfile", null, []]}
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - sql file path is D:\Projects\LHLOS\sql_scripts\test.sql.
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - current dir is D:\Projects\LHLOS
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["savedb.read_sql_file.compute", "result"], "type_check_data": [true, "result", null, []]}
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SUCCESS - Finished execution of step "savedb.read_sql_file.compute" in 56ms.
 event_specific_data = {"duration_ms": 56.736900000000205}
               solid = "read_sql_file"
    solid_definition = "read_sqlfile"
            step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.read_db.compute".
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "query" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "query", "type_check_data": [true, "query", null, []]}
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "engine" of type "Any". (Type check passed).
 event_specific_data = {"input_name": "engine", "type_check_data": [true, "engine", null, []]}
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - ERROR - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_FAILURE - Execution of step "savedb.read_db.compute" failed.
            cls_name = "DagsterInvariantViolationError"
               solid = "read_db"
    solid_definition = "read_db"
            step_key = "savedb.read_db.compute"

  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 274, in dagster_event_sequence_for_step
    for step_event in check.generator(_core_dagster_event_sequence_for_step(step_context)):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 552, in _core_dagster_event_sequence_for_step
    _step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 322, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 680, in _user_event_sequence_for_step_compute_fn
    for event in gen:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 87, in _execute_core_compute
    for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 72, in _yield_compute_results
    ).format(event=repr(event), solid_name=str(step.solid_handle))

2020-02-25 10:51:55 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - Dependencies for step write_db.compute failed: ['savedb.read_db.compute']. Not executing.
               solid = "write_db"
    solid_definition = "write_db"
            step_key = "write_db.compute"
2020-02-25 10:51:55 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SKIPPED - Skipped execution of step "write_db.compute".
               solid = "write_db"
    solid_definition = "write_db"
            step_key = "write_db.compute"
2020-02-25 10:51:55 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - ENGINE_EVENT - Finished steps in process (pid: 4676) in 1.89s
 event_specific_data = {"error": null, "metadata_entries": [["pid", null, ["4676"]], ["step_keys", null, ["['savedb.connect_db.compute', 'savedb.read_sql_file.compute', 'savedb.read_db.compute', 'write_db.compute']"]]]}
2020-02-25 10:51:55 - dagster - ERROR - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - PIPELINE_FAILURE - Execution of pipeline "main" failed.
The error on the dagit is :
Copy code
dagster.core.errors.DagsterInvariantViolationError: Compute function for solid savedb.read_db yielded hospital ...... [9 rows x 52 columns] rather than an instance of the Output or Materialization class.
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 274, in dagster_event_sequence_for_step
    for step_event in check.generator(_core_dagster_event_sequence_for_step(step_context)):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 552, in _core_dagster_event_sequence_for_step
    _step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 322, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 680, in _user_event_sequence_for_step_compute_fn
    for event in gen:
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 87, in _execute_core_compute
    for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
  File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 72, in _yield_compute_results
    ).format(event=repr(event), solid_name=str(step.solid_handle))
a

alex

02/25/2020, 5:27 PM
in your solid
read_db
you have
return df
which as Abhi mentioned above will be an iterator instead of a dataframe when you supply the
chunksize
argument.
m

max

02/25/2020, 5:44 PM
i think we could improve that error message
t

Thein

02/25/2020, 5:50 PM
Thanks. Now I see the problem. So it works when I write the dataframe in
read_db
instead of passing it to another one.
a

alex

02/25/2020, 5:54 PM
Ya you could return a list of dataframe “chunks” by doing
return list(df)
but you would need to update the other solids accordingly
e

Eric

02/25/2020, 6:09 PM
just curious, could you have the solid yield each chunk (DataFrame) and each one would be sent downstream individually ? or does that go back to the no mapping yet thing
a

abhi

02/25/2020, 6:13 PM
A solid needs to return a set of finite outputs which can be mappable to the inputs of other solids. There maybe a map-reduce-esque solution but that is not supported yet if that is what you mean by "no mapping yet".
m

max

02/25/2020, 6:18 PM
if you don't mind being a guinea pig, would this error message have been more useful?
Copy code
dagster.core.errors.DagsterInvariantViolationError: Compute function for solid return_iterator yielded value '1' of type <class 'int'> rather than an instance of Output, Materialization, or ExpectationResult. Values yielded by solids must be wrapped in one of these types. If your solid has a single output and yields no other events, you may want to use `return` instead of `yield` in the body of your solid compute function. If you are already using `return`, you may be inadvertantly returning a generator rather than the value you expect.
or still hard to understand
@Eric if you knew how many chunks you were yielding, you could certainly have a fixed-width graph that processed multiple outputs and fanned them in later
@Thein i'm also curious if it was difficult to identify the error message
inside the user interface
t

Thein

02/25/2020, 6:24 PM
@max When I saw
iterator
in dagit web console, I knew it is due to the iterator but I was also thinking this is the first time I am using dagster so I might be doing something wrong. 🙂
29 Views