get_article2.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # -*- coding: utf-8 -*-
  2. from gpt.chatgpt import get_answer_from_gpt, get_article_gpt_pydantic
  3. from gpt.gpt_check import CheckGptAnswer, CheckArticleResult
  4. from tools.new_mysql import MySQLUploader
  5. from tools.loglog import logger, log_err_e
  6. from tools.thread_pool_manager import pool_executor
  7. from common.common_data import all_exchange_words
  8. from common.split_text import split_text_to_word, get_article_words_count
  9. from pydantic import BaseModel
  10. from cachetools import TTLCache
  11. from concurrent.futures import wait
  12. from random import randint, shuffle, sample
  13. import json
  14. import requests
  15. from openpyxl import load_workbook
  16. from tenacity import retry, stop_after_attempt, wait_fixed
  17. import httpx
  18. import asyncio
  19. def get_article_difficulty(article) -> int:
  20. """获取文章的难度值"""
  21. url = "http://qbank.yunzhixue.cn/api/article/analysis"
  22. data = {"body": article, "question": ""}
  23. try:
  24. response = requests.post(url, json=data)
  25. except Exception as e:
  26. log_err_e(e, msg="获取文章难度值;")
  27. return 0
  28. if response.status_code == 200:
  29. difficult_value = response.json()['data']['difficult']
  30. return difficult_value
  31. else:
  32. logger.error(f"错误状态码{response.status_code}")
  33. def find_interval(number) -> int:
  34. """
  35. 判断一个数字属于哪个难度等级区间。31级是例外情况,需要排查
  36. :param number: 要检查的数字。
  37. :return: 返回包含该数字的区间,如果没有找到,则返回 None。
  38. """
  39. intervals = [(1, 200), (201, 250), (251, 300), (301, 350), (351, 400), (401, 450), (451, 550), (551, 650), (651, 750), (751, 850),
  40. (851, 950),
  41. (951, 1100),
  42. (1101, 1250), (1251, 1400), (1401, 1550), (1551, 1700), (1701, 1900), (1901, 2100), (2101, 2300), (2301, 2600),
  43. (2601, 2900),
  44. (2901, 3200),
  45. (3201, 3500), (3501, 3900), (3901, 4300), (4301, 4700), (4701, 5100), (5101, 5500), (5501, 5900), (5901, 6500),
  46. (6501, 99999)]
  47. for index, (start, end) in enumerate(intervals, start=1):
  48. if start <= number <= end:
  49. return index
  50. logger.error(f"文章难度判断不对:{number}")
  51. return 0
  52. def merge_and_split(list1, list2):
  53. combined = list1 + list2
  54. import random
  55. random.shuffle(combined)
  56. two_thirds = []
  57. one_third = []
  58. total_length = len(combined)
  59. if total_length > 15:
  60. two_thirds = combined[:15]
  61. one_third = combined[15:]
  62. else:
  63. two_thirds = combined
  64. one_third = []
  65. return two_thirds, one_third
  66. class GetArticle:
  67. def __init__(self):
  68. self.m = MySQLUploader()
  69. self.client = httpx.AsyncClient(
  70. timeout=httpx.Timeout(180.0),
  71. limits=httpx.Limits(
  72. max_keepalive_connections=100,
  73. max_connections=1000,
  74. keepalive_expiry=90.0
  75. )
  76. )
  77. self.callback_url_dict = {}
  78. self.real_ip_dict = {}
  79. self.demo_name = {}
  80. self.article_result = {}
  81. self.punctuation = [",", ".", "!", "?", ":", ";", '"', "–", "_", "-", "...", "......"]
  82. all_exchange_words.update(self.punctuation)
  83. self.exchange_data: dict[str, list] = {}
  84. self.read_spring_bamboo_exchange_table()
  85. def read_spring_bamboo_exchange_table(self):
  86. """变形是键,原型是值"""
  87. wb = load_workbook(r"data/春笋单词对照变形.xlsx", read_only=True, data_only=True)
  88. ws = wb.active
  89. for row in ws.values:
  90. prototype = row[0]
  91. exchange = row[1]
  92. if prototype not in self.exchange_data:
  93. self.exchange_data[prototype] = [exchange]
  94. else:
  95. self.exchange_data[prototype].append(exchange)
  96. wb.close()
  97. async def parser_insert_to_mysql(self, resp_result):
  98. try:
  99. for single_article in resp_result['articles']:
  100. article = single_article['body']
  101. article_json = json.dumps(single_article)
  102. difficult_value = find_interval(get_article_difficulty(article))
  103. if not difficult_value:
  104. logger.error("文章难度等级为0;")
  105. sql = "INSERT INTO spring_bamboo_article (article_json,difficult_level) VALUES (%s,%s)"
  106. self.m.execute_(sql, (article_json, difficult_value))
  107. except Exception as e:
  108. logger.error(f"插入数据库时发生错误: {str(e)}")
  109. raise
  110. async def submit_task(self, real_ip: str, core_words: list, take_count: int,
  111. demo_name: str, reading_level: int, article_length: int, exercise_id: int):
  112. """
  113. core_words: 词义数据组
  114. take_count: 取文章数量 (int类型,正常是2篇,最大8篇)
  115. demo_name: 项目名称
  116. reading_level:阅读等级
  117. article_length:文章长度
  118. exercise_id:学案id
  119. """
  120. task_id = randint(10000000, 99999999)
  121. logger.info(f"reading-comprehension 生成文章id。学案id:{exercise_id},task_id:{task_id}")
  122. try:
  123. self.real_ip_dict[task_id] = real_ip
  124. self.demo_name[task_id] = demo_name
  125. resp_result = await self.run_task(core_words, task_id, take_count, reading_level, article_length)
  126. await self.parser_insert_to_mysql(resp_result)
  127. logger.success(f"reading-comprehension 文章2任务完成。学案id:{exercise_id},taskid:{task_id}\n{resp_result}")
  128. return resp_result
  129. except Exception as e:
  130. err_msg = f"GetArticle提交任务失败{type(e).__name__},{e}"
  131. log_err_e(e, msg="GetArticle提交任务失败;")
  132. return err_msg
  133. @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), reraise=True)
  134. async def get_article(self, core_words: list, task_id: int, reading_level, article_length) -> dict:
  135. if not article_length:
  136. if 0 < reading_level <= 10:
  137. article_length = 50 + 10 * reading_level
  138. elif 10 < reading_level <= 20:
  139. article_length = 150 + 30 * (reading_level - 10)
  140. else:
  141. article_length = 450 + 20 * (reading_level - 20)
  142. for index, (start, end) in enumerate([(1, 8), (9, 16), (17, 24), (24, 30)], start=1):
  143. if start <= reading_level <= end:
  144. difficulty_control_stage = index
  145. break
  146. else:
  147. difficulty_control_stage = 2
  148. diffculty_control = {
  149. 1: {"grade": "小学", "desc_difficulty": "最简单最容易没有难度", "paragraph_count": "1-2",
  150. "desc2": "文章整体非常简洁,通俗易懂,适合初学者,刚入门,单词全是最常见的,语句通顺即可。",
  151. "choice_desc": "选择题难度尽可能简单,参考中国小学生水平"},
  152. 2: {"grade": "初中", "desc_difficulty": "简单、常见、难度低", "paragraph_count": "2-3",
  153. "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。",
  154. "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"},
  155. 3: {"grade": "初中", "desc_difficulty": "简单、常见、难度低", "paragraph_count": "2-3",
  156. "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。",
  157. "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"},
  158. 4: {"grade": "高中", "desc_difficulty": "常见、高中难度的", "paragraph_count": "3-5",
  159. "desc2": "文章整体难度适中,大约和中国的高中生,中国CET-6,雅思6分这样的难度标准。",
  160. "choice_desc": "选择题难度偏难,要有迷惑性混淆性,答案不要出现直接在文中,4个选项要学生推理或逻辑判断,参考中国高中生水平,高考标准。"}
  161. }
  162. grade = diffculty_control[difficulty_control_stage]["grade"]
  163. select_diffculty = diffculty_control[difficulty_control_stage]["desc_difficulty"]
  164. select_paragraph_count = diffculty_control[difficulty_control_stage]["paragraph_count"]
  165. desc2 = diffculty_control[difficulty_control_stage]["desc2"]
  166. choice_desc = diffculty_control[difficulty_control_stage]["choice_desc"]
  167. shuffle(core_words)
  168. core_words_meaning_str = ";".join([str(i['meaning_id']) + ' ' + i["spell"] + ":" + i["meaning"] for i in core_words])
  169. no_escape_code = r"\\n\\n"
  170. sys_prompt = "你是一个专业的英语老师,擅长根据用户提供的词汇生成对应的英语文章和中文翻译和4个配套选择题。"
  171. q = f"""下面我会为你提供一组数据,[单词组](里面包含词义id,英语单词,中文词义),请根据这些单词的中文词义,\
  172. 生成一篇带中文翻译的考场英语文章,英语文章和中文翻译要有[标题]。注意这个单词有多个词义时,生成的英语文章一定要用提供的中文词义。并挑选一句复杂的句子和其中文翻译,放入difficultSentences。\
  173. 英语文章,放入"englishArticle"中。中文翻译,放入"chineseArticle"中。最终文中使用到的单词id放入"usedMeanIds"中。\
  174. 4个选择题,放入questions字段。questions结构下有4个选择题对象,其中trunk是[英语]问题文本,analysis是[中文]的问题分析,candidates是4个ABCD选项,内部有label是指选项序号A B C D ,text是[英语]选项文本,isRight是否正确答案1是正确0是错误。
  175. 要求:
  176. 1.必须用提供的这个词义的单词,其他单词使用{select_diffculty}的单词。{desc2}{choice_desc}
  177. 2.优先保证文章语句通顺,意思不要太生硬。不要为了使用特定的单词,造成文章语义前后不搭,允许不使用个别词义。
  178. 3.文章中使用提供单词,一定要和提供单词的中文词义匹配,尤其是一词多义时,务必使用提供单词的词义。必须要用提供单词的词义。如果用到的词义与提供单词词义不一致,请不要使用这个单词。
  179. 4.生成的文章要求{article_length}词左右,可以用{no_escape_code}字符分段,一般{select_paragraph_count}个段落左右。第一段是文章标题。
  180. 5.允许不使用[单词组]的个别单词,优先保证文章整体意思通顺连贯和故事完整。
  181. 6.注意回复字段的中英文,englishArticle是英文,chineseArticle是中文,其中trunk是英文,analysis是中文,text是英文。
  182. 提供[单词组]:{core_words_meaning_str};
  183. """
  184. try:
  185. real_ip = self.real_ip_dict[task_id]
  186. demo_name = self.demo_name[task_id]
  187. r_json = json.loads(await get_article_gpt_pydantic(q, temperature=0.9, real_ip=real_ip, demo_name=demo_name, model='gpt-4.1',
  188. check_fucn=CheckArticleResult.get_article_1, max_tokens=4000,
  189. sys_prompt=sys_prompt, client=self.client))
  190. allWordAmount = 0
  191. articleWordAmount = get_article_words_count(r_json["englishArticle"])
  192. allWordAmount += articleWordAmount
  193. for i in r_json["questions"]:
  194. count_trunk = get_article_words_count(i["trunk"])
  195. count_candidates = sum([get_article_words_count(ii["text"]) for ii in i["candidates"]])
  196. allWordAmount += count_trunk
  197. allWordAmount += count_candidates
  198. usedMeanIds: list = r_json['usedMeanIds']
  199. article_words = split_text_to_word(r_json['englishArticle'])
  200. for i in core_words:
  201. meaning_id = i.get('meaning_id', 0)
  202. if not meaning_id:
  203. continue
  204. word = i["spell"]
  205. if meaning_id not in usedMeanIds and word in self.exchange_data:
  206. words_exchanges_list = self.exchange_data[word]
  207. for exchange_word in words_exchanges_list:
  208. if exchange_word in article_words:
  209. usedMeanIds.append(meaning_id)
  210. break
  211. r_json["body"] = r_json.pop("englishArticle")
  212. r_json["chinese"] = r_json.pop("chineseArticle")
  213. for q in r_json['questions']:
  214. data = q['candidates']
  215. shuffled_candidates = sample(data, len(data))
  216. labels = ['A', 'B', 'C', 'D']
  217. for index, candidate in enumerate(shuffled_candidates):
  218. candidate['label'] = labels[index]
  219. q['candidates'] = shuffled_candidates
  220. return {**r_json, "allWordAmount": allWordAmount, "articleWordAmount": articleWordAmount}
  221. except httpx.HTTPError as e:
  222. logger.error(f"HTTP请求错误: {str(e)}")
  223. raise
  224. except json.JSONDecodeError as e:
  225. logger.error(f"JSON解析错误: {str(e)}")
  226. raise
  227. except Exception as e:
  228. log_err_e(e, f"gpt生成文章回复其他错误.")
  229. raise
  230. async def run_get_article_task(self, core_words, task_id, take_count, reading_level, article_length) -> dict:
  231. """
  232. :param core_words: 核心单词数据,优先级1;可能为空
  233. :param task_id: 任务id
  234. :param take_count: 文章数量
  235. :param reading_level:阅读等级
  236. :param article_length:文章长度
  237. :return:
  238. """
  239. try:
  240. tasks = []
  241. for i in range(take_count):
  242. tasks.append(
  243. self.get_article(core_words, task_id, reading_level, article_length))
  244. results = await asyncio.gather(*tasks, return_exceptions=True)
  245. for result in results:
  246. if isinstance(result, Exception):
  247. continue
  248. return {"articles": results}
  249. except Exception as e:
  250. logger.error(f"运行文章任务时发生错误: {str(e)}")
  251. raise
  252. async def run_task(self, core_words, task_id, take_count, reading_level, article_length):
  253. try:
  254. outside_json = await self.run_get_article_task(core_words, task_id, take_count, reading_level, article_length)
  255. return outside_json
  256. except Exception as e:
  257. log_err_e(e, msg="外层总任务捕获错误")
  258. finally:
  259. self.real_ip_dict.pop(task_id)
  260. self.demo_name.pop(task_id)
  261. async def cleanup(self):
  262. """清理所有资源"""
  263. try:
  264. if hasattr(self, 'client'):
  265. await self.client.aclose()
  266. self.real_ip_dict.clear()
  267. self.demo_name.clear()
  268. self.callback_url_dict.clear()
  269. self.article_result.clear()
  270. except Exception as e:
  271. logger.error(f"清理资源时发生错误: {str(e)}")
  272. raise
  273. def __del__(self):
  274. """析构函数,确保资源被正确释放"""
  275. if hasattr(self, 'client'):
  276. asyncio.create_task(self.cleanup())