Stefano
02/27/2023, 9:48 PMInput 'cohortA' of graph 'patients' has no way of being resolved. Must provide a resolution to this input via another op/graph, or via a direct input value mapped from the top-level graph.
Where of course cohortA is one of my asset and patients is the name of the @graph_asset function.
Should be a pretty common pipeline but I cannot make it work. Any suggestion?sandy
02/27/2023, 10:29 PMStefano
03/01/2023, 1:26 PMfrom dagster import AssetsDefinition, Definitions, Out, graph_asset, op, asset, job
from ioManagers import db_manager, gcp_connection
import pandas as pd
@asset(group_name='raw_data', required_resource_keys={"gcp_connection"})
def cohortA(_context) -> pd.DataFrame:
print("Reading data from gcp.")
@asset(group_name='raw_data', required_resource_keys={"gcp_connection"})
def cohortB(_context) -> pd.DataFrame:
print("Reading data from gcp...")
'''
Op graph for generating a clean patient table
'''
@op()
def getSizeCohortB(cohortB: pd.DataFrame) -> int:
return cohortB.shape[0]
@op()
def mergedf(cohortA: pd.DataFrame, cohortB:pd.DataFrame , getSizeCohortB: int) -> pd.DataFrame:
@op(out=Out(io_manager_key='db_io_manager',metadata={"schema": "rwe", "table": "patients"}))
def final_patients(mergedf: pd.DataFrame) -> pd.DataFrame:
# Save to the database
@graph_asset()
def patients(cohortA: pd.DataFrame, cohortB: pd.DataFrame) -> pd.DataFrame:
return final_patients(
mergedf(cohortA,cohortB, getSizeCohortB(cohortB)
)
)
@job(resource_defs={"gcp_connection": gcp_connection, "db_io_manager": db_manager})
def prepare_patients_job():
patients()