Andres Crucetta
04/10/2023, 9:20 PM@config_mapping(
config_schema={"raw_table": str, "azure_stage": str, "file_format": str}
)
def upload_data_config_mapping(config):
return {
"ops": {
"copy_into_op": {
"config": {
"raw_table": config["raw_table"],
"azure_stage": config["azure_stage"],
"file_format": config["file_format"],
}
}
}
}
@job(config=upload_data_config_mapping)
def upload_data():
"""
Upload data from Azure Blob Storage to Snowflake.
"""
logger = get_dagster_logger()
<http://logger.info|logger.info>("Starting upload_data job")
copy_into_op()
<http://logger.info|logger.info>("Finished upload_data job")
@op(
config_schema={
"raw_table": Field(
String,
is_required=True,
description="The Snowflake table to copy the file into.",
),
"azure_stage": Field(
String,
is_required=True,
description="The Azure stage name to copy the file into.",
),
"file_format": Field(
String, is_required=True, description="The Snowflake file format to use."
),
},
out=Out(dagster_type=bool),
)
def copy_into_op(context, db_conn: DatabaseResource) -> bool:
"""
Copy the file into Snowflake
"""
# Get the snowflake connection
<http://logging.info|logging.info>("Copying file into Snowflake")
query = f"""
COPY INTO {context.op_config['raw_table']}
FROM @{context.op_config['azure_stage']}
CREDENTIALS = (
AZURE_SAS_TOKEN='{os.getenv('AZURE_SAS_TOKEN')}'
)
FILE_FORMAT = (FORMAT_NAME = {context.op_config['file_format']})
"""
try:
db_conn.execute(query)
except Exception as e:
logging.error(f"Error while copying file into Snowflake: {e}")
return False
<http://logging.info|logging.info>("File copied into Snowflake")
return True
class DatabaseResource(ConfigurableResource):
user: str
password: str
account: str
warehouse: str
def _connect(self):
"""
Create a Snowflake connection using the credentials provided in the config file.""
"""
credentials = {
"user": self.user,
"password": self.password,
"account": self.account,
"warehouse": self.warehouse,
}
self.conn = connector.connect(**credentials)
self.cursor = self.conn.cursor()
self.validate_conn()
def execute(self, query: str):
"""
Execute a query
:param query: The query to execute
"""
self._connect()
self.cursor.execute(f"USE WAREHOUSE {EnvVar('SNOWFLAKE_WAREHOUSE_NAME')}")
self.cursor.execute(f"USE DATABASE {EnvVar('SNOWFLAKE_DATABASE_NAME')}")
self.cursor.execute(f"USE SCHEMA {EnvVar('SNOWFLAKE_SCHEMA_NAME')}")
self.cursor.execute(f"USE ROLE {EnvVar('SNOWFLAKE_ROLE_NAME')}")
self.cursor.execute(query)
And here's the init
defs = Definitions(
sensors=all_sensors,
schedules=[upload_data_cerpass_mac],
jobs=[jobs.upload_data],
resources={"db_conn": DatabaseResource(
account=SHARED_SNOWFLAKE_CONFIG["account"],
user=SHARED_SNOWFLAKE_CONFIG["user"],
password=SHARED_SNOWFLAKE_CONFIG["password"],
warehouse=SHARED_SNOWFLAKE_CONFIG["warehouse"],
)}