Thein
02/25/2020, 3:54 AMdagster.core.errors.DagsterInvariantViolationError: Compute function for solid
when I tried pd.read_sql
with chunksize
. It works fine without chunksize
.
@solid
def read_db(_context, query, engine):
df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
return df
@composite_solid
def savedb():
connect_db = connectdb.alias("connect_db")
read_sql_file = read_sqlfile.alias("read_sql_file")
# <http://context.log.info|context.log.info>("test done")
data = read_db(read_sql_file(), connect_db())
return data
max
02/25/2020, 4:46 AMabhi
02/25/2020, 8:04 AMThein
02/25/2020, 2:00 PMimport csv
import pandas as pd
from sqlalchemy import create_engine
import os
from load_save_dagster import save_obj as save_pkl
from save_load_chunk_dagster import save_chunk
from save_load_chunk_dagster import load_chunk
from dagster import (
composite_solid,
execute_pipeline,
execute_solid,
pipeline,
solid,
)
@solid
def connectdb(context, crd_path):
with open(crd_path, "r") as f:
lines = [row for row in csv.DictReader(f)]
alist = list(lines[0].values())
usr = alist[0]
pwd = alist[1]
engine = create_engine(
"oracle://{usr}:{pwd}@mydatabase".format(usr=usr, pwd=pwd),
pool_pre_ping=True,
)
return engine
@solid
def read_sqlfile(context, sqlfile):
sqlfile = os.path.join(*[os.getcwd(), "sql_scripts", sqlfile])
with open(sqlfile, "r") as f:
query = f.read()
<http://context.log.info|context.log.info>("sql file path is {path}.".format(path=sqlfile))
<http://context.log.info|context.log.info>("current dir is {path}".format(path=os.getcwd()))
<http://context.log.info|context.log.info>("sql script ....\n {file}".format(file=query))
return query
@solid
def read_db(_context, query, engine):
df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
return df
@composite_solid
def savedb():
connect_db = connectdb.alias("connect_db")
read_sql_file = read_sqlfile.alias("read_sql_file")
# <http://context.log.info|context.log.info>("test done")
data = read_db(read_sql_file(), connect_db())
return data
@solid
def write_db(context, data):
save_chunk(data, "hstay_v1")
@pipeline
def main():
write_db(savedb())
if __name__ == "__main__":
environment_dict = {
"solids": {
"savedb": {
"solids": {
"connect_db": {
"inputs": {"crd_path": {"value": "credentials.json"}}
},
"read_sql_file": {
"inputs": {"sqlfile": {"value": "sql_v3.sql"}}
},
}
}
}
}
result = execute_pipeline(main, environment_dict=environment_dict)
Thein
02/25/2020, 2:01 PMsave_chunk.py
. It is just joblib.dump
for all chunks.
def save_obj(obj, name, name_dir="data"):
"""
Save to pickle.
Parameters
----------
obj : any object
This can be a dictionary or ndarray.
name : str
The name for the object to be saved.
name_dir : str, default 'data'
Name of the directory.
Returns
-------
No return.
Save the pickle object to the local file system.
"""
dir_above = os.getcwd()
data_dir_path = os.path.join(dir_above, name_dir)
if not os.path.isdir(data_dir_path):
os.makedirs(data_dir_path)
data_path = os.path.join(data_dir_path, name + ".pkl")
with open(data_path, "wb") as f:
joblib.dump(obj, f)
def save_chunk(reader, name):
for i, chunk in enumerate(reader):
save_obj(chunk, "{name}_{Idx}".format(name=name, Idx=i))
abhi
02/25/2020, 2:20 PMThein
02/25/2020, 3:55 PMThein
02/25/2020, 3:55 PM2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - PIPELINE_START - Started execution of pipeline "main".
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - ENGINE_EVENT - Executing steps in process (pid: 4676)
event_specific_data = {"error": null, "metadata_entries": [["pid", null, ["4676"]], ["step_keys", null, ["['savedb.connect_db.compute', 'savedb.read_sql_file.compute', 'savedb.read_db.compute', 'write_db.compute']"]]]}
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.connect_db.compute".
solid = "connect_db"
solid_definition = "connectdb"
step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "crd_path" of type "Any". (Type check passed).
event_specific_data = {"input_name": "crd_path", "type_check_data": [true, "crd_path", null, []]}
solid = "connect_db"
solid_definition = "connectdb"
step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["savedb.connect_db.compute", "result"], "type_check_data": [true, "result", null, []]}
solid = "connect_db"
solid_definition = "connectdb"
step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SUCCESS - Finished execution of step "savedb.connect_db.compute" in 38ms.
event_specific_data = {"duration_ms": 38.864100000000064}
solid = "connect_db"
solid_definition = "connectdb"
step_key = "savedb.connect_db.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.read_sql_file.compute".
solid = "read_sql_file"
solid_definition = "read_sqlfile"
step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "sqlfile" of type "Any". (Type check passed).
event_specific_data = {"input_name": "sqlfile", "type_check_data": [true, "sqlfile", null, []]}
solid = "read_sql_file"
solid_definition = "read_sqlfile"
step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - sql file path is D:\Projects\LHLOS\sql_scripts\test.sql.
solid = "read_sql_file"
solid_definition = "read_sqlfile"
step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - current dir is D:\Projects\LHLOS
solid = "read_sql_file"
solid_definition = "read_sqlfile"
step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["savedb.read_sql_file.compute", "result"], "type_check_data": [true, "result", null, []]}
solid = "read_sql_file"
solid_definition = "read_sqlfile"
step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:53 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SUCCESS - Finished execution of step "savedb.read_sql_file.compute" in 56ms.
event_specific_data = {"duration_ms": 56.736900000000205}
solid = "read_sql_file"
solid_definition = "read_sqlfile"
step_key = "savedb.read_sql_file.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_START - Started execution of step "savedb.read_db.compute".
solid = "read_db"
solid_definition = "read_db"
step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "query" of type "Any". (Type check passed).
event_specific_data = {"input_name": "query", "type_check_data": [true, "query", null, []]}
solid = "read_db"
solid_definition = "read_db"
step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_INPUT - Got input "engine" of type "Any". (Type check passed).
event_specific_data = {"input_name": "engine", "type_check_data": [true, "engine", null, []]}
solid = "read_db"
solid_definition = "read_db"
step_key = "savedb.read_db.compute"
2020-02-25 10:51:54 - dagster - ERROR - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_FAILURE - Execution of step "savedb.read_db.compute" failed.
cls_name = "DagsterInvariantViolationError"
solid = "read_db"
solid_definition = "read_db"
step_key = "savedb.read_db.compute"
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 274, in dagster_event_sequence_for_step
for step_event in check.generator(_core_dagster_event_sequence_for_step(step_context)):
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 552, in _core_dagster_event_sequence_for_step
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 322, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 680, in _user_event_sequence_for_step_compute_fn
for event in gen:
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 87, in _execute_core_compute
for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 72, in _yield_compute_results
).format(event=repr(event), solid_name=str(step.solid_handle))
2020-02-25 10:51:55 - dagster - INFO - system - b7263398-7221-47b2-8e93-6f7fc27de3cb - Dependencies for step write_db.compute failed: ['savedb.read_db.compute']. Not executing.
solid = "write_db"
solid_definition = "write_db"
step_key = "write_db.compute"
2020-02-25 10:51:55 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - STEP_SKIPPED - Skipped execution of step "write_db.compute".
solid = "write_db"
solid_definition = "write_db"
step_key = "write_db.compute"
2020-02-25 10:51:55 - dagster - DEBUG - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - ENGINE_EVENT - Finished steps in process (pid: 4676) in 1.89s
event_specific_data = {"error": null, "metadata_entries": [["pid", null, ["4676"]], ["step_keys", null, ["['savedb.connect_db.compute', 'savedb.read_sql_file.compute', 'savedb.read_db.compute', 'write_db.compute']"]]]}
2020-02-25 10:51:55 - dagster - ERROR - main - b7263398-7221-47b2-8e93-6f7fc27de3cb - PIPELINE_FAILURE - Execution of pipeline "main" failed.
Thein
02/25/2020, 3:57 PMdagster.core.errors.DagsterInvariantViolationError: Compute function for solid savedb.read_db yielded hospital ...... [9 rows x 52 columns] rather than an instance of the Output or Materialization class.
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 274, in dagster_event_sequence_for_step
for step_event in check.generator(_core_dagster_event_sequence_for_step(step_context)):
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 552, in _core_dagster_event_sequence_for_step
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 322, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\engine\engine_inprocess.py", line 680, in _user_event_sequence_for_step_compute_fn
for event in gen:
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 87, in _execute_core_compute
for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
File "c:\users\jd\appdata\local\continuum\anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 72, in _yield_compute_results
).format(event=repr(event), solid_name=str(step.solid_handle))
alex
02/25/2020, 5:27 PMread_db
you have return df
which as Abhi mentioned above will be an iterator instead of a dataframe when you supply the chunksize
argument.max
02/25/2020, 5:44 PMThein
02/25/2020, 5:50 PMread_db
instead of passing it to another one.alex
02/25/2020, 5:54 PMreturn list(df)
but you would need to update the other solids accordinglyEric
02/25/2020, 6:09 PMabhi
02/25/2020, 6:13 PMmax
02/25/2020, 6:18 PMmax
02/25/2020, 6:18 PMdagster.core.errors.DagsterInvariantViolationError: Compute function for solid return_iterator yielded value '1' of type <class 'int'> rather than an instance of Output, Materialization, or ExpectationResult. Values yielded by solids must be wrapped in one of these types. If your solid has a single output and yields no other events, you may want to use `return` instead of `yield` in the body of your solid compute function. If you are already using `return`, you may be inadvertantly returning a generator rather than the value you expect.
max
02/25/2020, 6:18 PMmax
02/25/2020, 6:19 PMmax
02/25/2020, 6:19 PMmax
02/25/2020, 6:19 PMThein
02/25/2020, 6:24 PMiterator
in dagit web console, I knew it is due to the iterator but I was also thinking this is the first time I am using dagster so I might be doing something wrong. 🙂