# -*- coding:utf-8 -*- import os from tools.loglog import logger import pymysql from dbutils.pooled_db import PooledDB import time class MySQLUploader: _instance = None _pool = None _initialized = False def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(MySQLUploader, cls).__new__(cls) return cls._instance def __init__(self, database='qbank_db'): if not self._initialized: self.host = 'rm-uf6881jgyy065rdxdoo.rwlb.rds.aliyuncs.com' self.user = 'qingti_user' self.__password = 'qingti@2024' self.database = database self.attempts = 0 self.max_attempts = 3 self.start_time = None self.connect() self._initialized = True def __del__(self): if self._pool: self._pool.close() self._pool = None def connect(self): if not self._pool: self._pool = PooledDB( creator=pymysql, maxconnections=20, mincached=2, maxcached=5, maxshared=5, blocking=True, host=self.host, user=self.user, password=self.__password, database=self.database, port=3306, charset='utf8mb4', ) print("connect mysql succeed") def execute_(self, query, params=None): for _ in range(3): conn = self._pool.connection() cursor = conn.cursor() try: if params: cursor.execute(query, params) else: cursor.execute(query) conn.commit() return True except pymysql.MySQLError as e: logger.warning(f"可忽略的错误 {type(e).__name__},{e}") conn.rollback() time.sleep(0.5) finally: cursor.close() conn.close() logger.critical(f"execute_严重错误,3次提交没成功.{query} {params}") return False def bulk_insert(self, query, data_list): """执行批量插入""" for _ in range(3): conn = self._pool.connection() cursor = conn.cursor() try: cursor.executemany(query, data_list) conn.commit() return True except pymysql.MySQLError as e: logger.warning(f"可忽略的错误 bulk_insert数据库批量插入错误{type(e).__name__}:{e}") conn.rollback() time.sleep(0.5) finally: cursor.close() conn.close() logger.critical(f"bulk_insert,3次提交没成功.{query} {data_list}") return False def query_data(self, query, params=None): """执行查询并返回结果""" for _ in range(3): conn = self._pool.connection() cursor = conn.cursor() try: if params: cursor.execute(query, params) else: cursor.execute(query) results = cursor.fetchall() return results except pymysql.MySQLError as e: logger.warning(f"可忽略query_data错误类型{type(e).__name__}:{e}") logger.warning(f"可忽略query_data错误:{query},{params}") time.sleep(0.5) finally: cursor.close() conn.close() logger.critical(f"query_data 3次没成功.{query} {params}") return False def execute_sql_file(self, script_file_path): """执行sql脚本:传入路径或者sql路径都可以""" def execute_file(path): with open(path, 'r', encoding='utf-8') as file: sql_script = file.read() conn = self._pool.connection() cursor = conn.cursor() cursor.execute(sql_script) conn.commit() cursor.close() conn.close() if os.path.isdir(script_file_path): for file in os.listdir(script_file_path): execute_file(script_file_path + "\\" + file) else: if script_file_path.endswith(".sql"): execute_file(script_file_path) def close_connection(self): ... if __name__ == '__main__': m = MySQLUploader() s = "select Id,BritishPronunciation from dictionary_word where wordspelling = %s" r = m.query_data(s, ("sky",)) print(r) input() #