https://dagster.io/ logo
Title
a

Andrew Brown

09/24/2021, 8:40 PM
We’re wanting to build new pipelines using the
@op
and
@graph
decorators to future-proof them. I’m having trouble with including a new graph in an existing repo, as the repository is expecting a PipelineDefinition to be returned:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception iterating responses: Invariant failed. Description: Bad constructor for pipeline gpdc_identification_roster__to__snowflake__graph: must return PipelineDefinition, got value of type <class 'dagster.core.definitions.graph.GraphDefinition'>" debug_error_string = "{"created":"@1632515755.844435000","description":"Error received from peer ipv4:172.22.0.5:4003","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: Invariant failed. Description: Bad constructor for pipeline gpdc_identification_roster__to__snowflake__graph: must return PipelineDefinition, got value of type <class 'dagster.core.definitions.graph.GraphDefinition'>","grpc_status":2}" >
@repository
def miscellaneous_repository():
    return {
        "pipelines": {
            "gpdc_identification_roster__to__snowflake__graph": lambda: gpdc_identification_roster__to__snowflake__graph,
            "snowflake__information_schema__query_history__to__s3__snowflake_logs__pipeline": lambda: snowflake__information_schema__query_history__to__s3__snowflake_logs__pipeline,
            "materialization_deployment__pipeline": lambda: materialization_deployment__pipeline,
            "daily_tests__pipeline": lambda: daily_tests__pipeline,
        },
        "schedules": {
            "schedule__snowflake__information_schema__query_history__to__s3__snowflake_logs__pipeline": lambda: schedule__snowflake__information_schema__query_history__to__s3__snowflake_logs__pipeline,
            "schedule__daily_tests__pipeline": lambda: schedule__daily_tests__pipeline,
        },
        "sensors": {"slack_on_pipeline_failure": lambda: slack_on_pipeline_failure},
    }
^^^ that’s the repository code in question
Oh you know what, I just answered my own question. I need to use the
jobs
key in the dictionary. 🤦
Friday afternoons ftw
d

daniel

09/24/2021, 8:44 PM
That's right - and then they also need to be jobs, not graphs (you can create a job from a graph with the to_job() function on GraphDefinition: https://docs.dagster.io/_apidocs/experimental#job)
a

Andrew Brown

09/24/2021, 8:45 PM
👍 thanks for the additional color