123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- # -*- coding:utf-8 -*-
- import os
- from tools.loglog import logger
- import pymysql
- from dbutils.pooled_db import PooledDB
- import time
- class MySQLUploader:
- _instance = None
- _pool = None
- _initialized = False
- def __new__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance = super(MySQLUploader, cls).__new__(cls, *args, **kwargs)
- return cls._instance
- def __init__(self, database='qbank_db'):
- if not self._initialized:
- self.host = 'rm-uf6881jgyy065rdxdoo.rwlb.rds.aliyuncs.com'
- self.user = 'qingti_user'
- self.__password = 'qingti@2024'
- self.database = database
- self.attempts = 0
- self.max_attempts = 3
- self.start_time = None
- self.connect()
- self._initialized = True
- def __del__(self):
- if self._pool:
- self._pool.close()
- self._pool = None
- def connect(self):
- if not self._pool:
- self._pool = PooledDB(
- creator=pymysql,
- maxconnections=20,
- mincached=2,
- maxcached=5,
- maxshared=5,
- blocking=True,
- host=self.host,
- user=self.user,
- password=self.__password,
- database=self.database,
- port=3306,
- charset='utf8mb4',
- )
- print("connect mysql succeed")
- def execute_(self, query, params=None):
- for _ in range(3):
- conn = self._pool.connection()
- cursor = conn.cursor()
- try:
- if params:
- cursor.execute(query, params)
- else:
- cursor.execute(query)
- conn.commit()
- return True
- except pymysql.MySQLError as e:
- logger.warning(f"可忽略的错误 {type(e).__name__},{e}")
- conn.rollback()
- time.sleep(0.5)
- finally:
- cursor.close()
- conn.close()
- logger.critical(f"execute_严重错误,3次提交没成功.{query} {params}")
- return False
- def bulk_insert(self, query, data_list):
- """执行批量插入"""
- for _ in range(3):
- conn = self._pool.connection()
- cursor = conn.cursor()
- try:
- cursor.executemany(query, data_list)
- conn.commit()
- return True
- except pymysql.MySQLError as e:
- logger.warning(f"可忽略的错误 bulk_insert数据库批量插入错误{type(e).__name__}:{e}")
- conn.rollback()
- time.sleep(0.5)
- finally:
- cursor.close()
- conn.close()
- logger.critical(f"bulk_insert,3次提交没成功.{query} {data_list}")
- return False
- def query_data(self, query, params=None):
- """执行查询并返回结果"""
- for _ in range(3):
- conn = self._pool.connection()
- cursor = conn.cursor()
- try:
- if params:
- cursor.execute(query, params)
- else:
- cursor.execute(query)
-
- results = cursor.fetchall()
- return results
- except pymysql.MySQLError as e:
- logger.warning(f"可忽略query_data错误类型{type(e).__name__}:{e}")
- logger.warning(f"可忽略query_data错误:{query},{params}")
- time.sleep(0.5)
- finally:
- cursor.close()
- conn.close()
- logger.critical(f"query_data 3次没成功.{query} {params}")
- return False
- def execute_sql_file(self,script_file_path):
- """执行sql脚本:传入路径或者sql路径都可以"""
- def execute_file(path):
-
- with open(path, 'r', encoding='utf-8') as file:
- sql_script = file.read()
- conn = self._pool.connection()
- cursor = conn.cursor()
-
-
- cursor.execute(sql_script)
-
- conn.commit()
- cursor.close()
- conn.close()
-
- if os.path.isdir(script_file_path):
- for file in os.listdir(script_file_path):
- execute_file(script_file_path + "\\" + file)
- else:
- if script_file_path.endswith(".sql"):
- execute_file(script_file_path)
- def close_connection(self):...
- if __name__ == '__main__':
- m = MySQLUploader()
- s = "select Id,BritishPronunciation from dictionary_word where wordspelling = %s"
- r = m.query_data(s, ("sky",))
- print(r)
- input()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- #
-
-
|