Hi everyone! I'm trying to build a simple pipeline...
# ask-community
s
Hi everyone! I'm trying to build a simple pipeline but struggling to succeed. The goal is very simple: Loading two @assets from gcp, performing some ops on them for generate a new asset via @graph_asset storing it in a database. The problem is that even though I'm following the tutorial here: https://docs.dagster.io/concepts/assets/graph-backed-assets#defining-basic-dependencies-for-graph-backed-assets I'm getting the error:
Input 'cohortA' of graph 'patients' has no way of being resolved. Must provide a resolution to this input via another op/graph, or via a direct input value mapped from the top-level graph.
Where of course cohortA is one of my asset and patients is the name of the @graph_asset function. Should be a pretty common pipeline but I cannot make it work. Any suggestion?
s
Hey Stefano - would you be able to share a code snippet I could use the reproduce your issue?
s
Can you reproduce this with this code ?
Copy code
from dagster import AssetsDefinition, Definitions, Out, graph_asset, op, asset, job
from ioManagers import db_manager, gcp_connection
import pandas as pd



@asset(group_name='raw_data', required_resource_keys={"gcp_connection"})
def cohortA(_context) -> pd.DataFrame:
    print("Reading data from gcp.")

@asset(group_name='raw_data', required_resource_keys={"gcp_connection"})
def cohortB(_context) -> pd.DataFrame:
    print("Reading data from gcp...")

'''
Op graph for generating a clean patient table
'''
@op()
def getSizeCohortB(cohortB: pd.DataFrame) -> int:
    return cohortB.shape[0]

@op()
def mergedf(cohortA: pd.DataFrame, cohortB:pd.DataFrame , getSizeCohortB: int) -> pd.DataFrame:


@op(out=Out(io_manager_key='db_io_manager',metadata={"schema": "rwe", "table": "patients"}))
def final_patients(mergedf: pd.DataFrame) -> pd.DataFrame:
    # Save to the database

@graph_asset()
def patients(cohortA: pd.DataFrame, cohortB: pd.DataFrame) -> pd.DataFrame:
    return final_patients(
            mergedf(cohortA,cohortB, getSizeCohortB(cohortB)
        )
    )

@job(resource_defs={"gcp_connection": gcp_connection, "db_io_manager": db_manager})
def prepare_patients_job():
    patients()