Alejandro A
09/03/2021, 1:18 AMdagster.core.errors.DagsterInvalidDefinitionError: In @pipeline pipe_2, received invalid type <class 'str'> for input "name" (at position 1) in solid invocation "hello2". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
Drew Sonne
09/03/2021, 11:36 AM@pipeline
function. The functions and variables within a @pipeline
function describe a dag they're not "real" functions. So any variables need to be passed through the solids.
One pattern to achieve what you're doing would be to use what I've seen around as "factories"
eg
@solid(
output_defs=[OutputDefinition(int)]
)
def hello():
yield Output(1)
def hello2_factory(name: str, age: int):
@solid(
input_defs=[
InputDefinition("name", str),
InputDefinition("age", int),
]
)
def hello2(hello: int):
print(f"Hello {name}, {age+hello}")
return hello2
@pipeline
def pipe_2():
hello2 = hello2_factory("Marcus", 20)
x = hello()
y = hello2(x)
Darren Ng
09/03/2021, 1:37 PM@solid(
description="Read from google sheet for data pipeline.",
output_defs=[
OutputDefinition(name="row_data"),
OutputDefinition(name="gspread_object")
]
)
def read_from_gsheet() -> List:
print("Instantiate gspread sheet object and read a row..."
row = sheet.row_values(2)
yield Output(row, output_name="row_data")
yield Output(sheet, output_name="gspread_object")
@solid(description="Write to cells in Gsheet")
def write_to_gsheet(sheet, cell: str, value: int):
sheet.update(cell, value)
@solid(description="Read from google sheet and query data from NLP")
def query_NLP_data(context, row):
received_row = row
print("Sending request to query NLP graph and return metrics")
some_bol_metrics = 1000
<http://context.log.info|context.log.info>(f"Returned NLP metrics: {some_bol_metrics}")
return some_bol_metrics
@pipeline
def gather_pipeline():
row, sheet = read_from_gsheet()
some_bol_metrics = query_NLP_data(row)
write_to_gsheet(sheet, 'H2', some_bol_metrics)
In this example here I'm reading a row from a gspread sheet object, sending that to another solid which should query company db and return some metrics (here 1000) then pass that into another solid which writes to the same sheet object and write into 'H2'. But I'm getting
/home/dazza/anaconda3/envs/versedai/lib/python3.8/site-packages/dagster/core/definitions/composition.py:65: UserWarning: While in @pipeline context 'gather_pipeline', received an uninvoked solid 'write_to_gsheet'.
warnings.warn(warning_message.strip())
/home/dazza/anaconda3/envs/versedai/lib/python3.8/site-packages/dagster/core/workspace/context.py:504: UserWarning: Error loading repository location gather_pipeline.py:dagster.core.errors.DagsterInvalidDefinitionError: In @pipeline gather_pipeline, received invalid type <class 'str'> for input "cell" (at position 1) in solid invocation "write_to_gsheet". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
prha
09/03/2021, 3:53 PM