new_mysql.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # -*- coding:utf-8 -*-
  2. import os
  3. from tools.loglog import logger
  4. import pymysql
  5. from dbutils.pooled_db import PooledDB
  6. import time
  7. class MySQLUploader:
  8. _instance = None
  9. _pool = None
  10. _initialized = False
  11. def __new__(cls, *args, **kwargs):
  12. if not cls._instance:
  13. cls._instance = super(MySQLUploader, cls).__new__(cls, *args, **kwargs)
  14. return cls._instance
  15. def __init__(self, database='qbank_db'):
  16. if not self._initialized:
  17. self.host = 'rm-uf6881jgyy065rdxdoo.rwlb.rds.aliyuncs.com'
  18. self.user = 'qingti_user'
  19. self.__password = 'qingti@2024'
  20. self.database = database
  21. self.attempts = 0
  22. self.max_attempts = 3
  23. self.start_time = None
  24. self.connect()
  25. self._initialized = True
  26. def __del__(self):
  27. if self._pool:
  28. self._pool.close()
  29. self._pool = None
  30. def connect(self):
  31. if not self._pool:
  32. self._pool = PooledDB(
  33. creator=pymysql,
  34. maxconnections=20,
  35. mincached=2,
  36. maxcached=5,
  37. maxshared=5,
  38. blocking=True,
  39. host=self.host,
  40. user=self.user,
  41. password=self.__password,
  42. database=self.database,
  43. port=3306,
  44. charset='utf8mb4',
  45. )
  46. print("connect mysql succeed")
  47. def execute_(self, query, params=None):
  48. for _ in range(3):
  49. conn = self._pool.connection()
  50. cursor = conn.cursor()
  51. try:
  52. if params:
  53. cursor.execute(query, params)
  54. else:
  55. cursor.execute(query)
  56. conn.commit()
  57. return True
  58. except pymysql.MySQLError as e:
  59. logger.warning(f"可忽略的错误 {type(e).__name__},{e}")
  60. conn.rollback()
  61. time.sleep(0.5)
  62. finally:
  63. cursor.close()
  64. conn.close()
  65. logger.critical(f"execute_严重错误,3次提交没成功.{query} {params}")
  66. return False
  67. def bulk_insert(self, query, data_list):
  68. """执行批量插入"""
  69. for _ in range(3):
  70. conn = self._pool.connection()
  71. cursor = conn.cursor()
  72. try:
  73. cursor.executemany(query, data_list)
  74. conn.commit()
  75. return True
  76. except pymysql.MySQLError as e:
  77. logger.warning(f"可忽略的错误 bulk_insert数据库批量插入错误{type(e).__name__}:{e}")
  78. conn.rollback()
  79. time.sleep(0.5)
  80. finally:
  81. cursor.close()
  82. conn.close()
  83. logger.critical(f"bulk_insert,3次提交没成功.{query} {data_list}")
  84. return False
  85. def query_data(self, query, params=None):
  86. """执行查询并返回结果"""
  87. for _ in range(3):
  88. conn = self._pool.connection()
  89. cursor = conn.cursor()
  90. try:
  91. if params:
  92. cursor.execute(query, params)
  93. else:
  94. cursor.execute(query)
  95. results = cursor.fetchall()
  96. return results
  97. except pymysql.MySQLError as e:
  98. logger.warning(f"可忽略query_data错误类型{type(e).__name__}:{e}")
  99. logger.warning(f"可忽略query_data错误:{query},{params}")
  100. time.sleep(0.5)
  101. finally:
  102. cursor.close()
  103. conn.close()
  104. logger.critical(f"query_data 3次没成功.{query} {params}")
  105. return False
  106. def execute_sql_file(self,script_file_path):
  107. """执行sql脚本:传入路径或者sql路径都可以"""
  108. def execute_file(path):
  109. with open(path, 'r', encoding='utf-8') as file:
  110. sql_script = file.read()
  111. conn = self._pool.connection()
  112. cursor = conn.cursor()
  113. cursor.execute(sql_script)
  114. conn.commit()
  115. cursor.close()
  116. conn.close()
  117. if os.path.isdir(script_file_path):
  118. for file in os.listdir(script_file_path):
  119. execute_file(script_file_path + "\\" + file)
  120. else:
  121. if script_file_path.endswith(".sql"):
  122. execute_file(script_file_path)
  123. def close_connection(self):...
  124. if __name__ == '__main__':
  125. m = MySQLUploader()
  126. s = "select Id,BritishPronunciation from dictionary_word where wordspelling = %s"
  127. r = m.query_data(s, ("sky",))
  128. print(r)
  129. input()
  130. #