I'm having many issues setting up the configuratio...
# ask-community
a
I'm having many issues setting up the configuration for the pipeline, I'm trying to set-up the resources for my jobs but somehow they don't get passed along to the actual ops and jobs.
Copy code
@config_mapping(
    config_schema={"raw_table": str, "azure_stage": str, "file_format": str}
)
def upload_data_config_mapping(config):
    return {
        "ops": {
            "copy_into_op": {
                "config": {
                    "raw_table": config["raw_table"],
                    "azure_stage": config["azure_stage"],
                    "file_format": config["file_format"],
                }
            }
        }
    }


@job(config=upload_data_config_mapping)
def upload_data():
    """
    Upload data from Azure Blob Storage to Snowflake.
    """
    logger = get_dagster_logger()
    <http://logger.info|logger.info>("Starting upload_data job")
    copy_into_op()
    <http://logger.info|logger.info>("Finished upload_data job")

@op(
    config_schema={
        "raw_table": Field(
            String,
            is_required=True,
            description="The Snowflake table to copy the file into.",
        ),
        "azure_stage": Field(
            String,
            is_required=True,
            description="The Azure stage name to copy the file into.",
        ),
        "file_format": Field(
            String, is_required=True, description="The Snowflake file format to use."
        ),
    },
    out=Out(dagster_type=bool),
)
def copy_into_op(context, db_conn: DatabaseResource) -> bool:
    """
    Copy the file into Snowflake
    """
    # Get the snowflake connection
    <http://logging.info|logging.info>("Copying file into Snowflake")
    query = f"""
            COPY INTO {context.op_config['raw_table']}
            FROM @{context.op_config['azure_stage']}
            CREDENTIALS = (
                AZURE_SAS_TOKEN='{os.getenv('AZURE_SAS_TOKEN')}'
                )
            FILE_FORMAT = (FORMAT_NAME = {context.op_config['file_format']})
        """
    try:
        db_conn.execute(query)
    except Exception as e:
        logging.error(f"Error while copying file into Snowflake: {e}")
        return False
    <http://logging.info|logging.info>("File copied into Snowflake")
    return True

class DatabaseResource(ConfigurableResource):
    user: str
    password: str
    account: str
    warehouse: str

    def _connect(self):
        """
        Create a Snowflake connection using the credentials provided in the config file.""
        """
        credentials = {
            "user": self.user,
            "password": self.password,
            "account": self.account,
            "warehouse": self.warehouse,
        }
        self.conn = connector.connect(**credentials)
        self.cursor = self.conn.cursor()
        self.validate_conn()

    def execute(self, query: str):
        """
        Execute a query

        :param query: The query to execute
        """
        self._connect()
        self.cursor.execute(f"USE WAREHOUSE {EnvVar('SNOWFLAKE_WAREHOUSE_NAME')}")
        self.cursor.execute(f"USE DATABASE {EnvVar('SNOWFLAKE_DATABASE_NAME')}")
        self.cursor.execute(f"USE SCHEMA {EnvVar('SNOWFLAKE_SCHEMA_NAME')}")
        self.cursor.execute(f"USE ROLE {EnvVar('SNOWFLAKE_ROLE_NAME')}")
        self.cursor.execute(query)

And here's the init

defs = Definitions(
    sensors=all_sensors,
    schedules=[upload_data_cerpass_mac],
    jobs=[jobs.upload_data],
    resources={"db_conn": DatabaseResource(
        account=SHARED_SNOWFLAKE_CONFIG["account"],
        user=SHARED_SNOWFLAKE_CONFIG["user"],
        password=SHARED_SNOWFLAKE_CONFIG["password"],
        warehouse=SHARED_SNOWFLAKE_CONFIG["warehouse"],
    )}
I'm trying to implement the Pythonic configuration but it has been extremely problematic and error-prone. My task should be extremely easy but it has been complicated when trying to transition from the config schema / resources to assigning the resources from the init as a db_conn. I don't even know how to test the resources on my local. It seems like I need to initialize them as a @resource to access them even though I'm adding them in the definition