# -*- coding: utf-8 -*- from gpt.chatgpt import get_answer_from_gpt, get_article_gpt_pydantic from gpt.gpt_check import CheckGptAnswer, CheckArticleResult from tools.new_mysql import MySQLUploader from tools.loglog import logger, log_err_e from tools.thread_pool_manager import pool_executor from common.common_data import all_exchange_words from common.split_text import split_text_to_word, get_article_words_count from pydantic import BaseModel from cachetools import TTLCache from concurrent.futures import wait from random import randint, shuffle, sample import json import requests from openpyxl import load_workbook from tenacity import retry, stop_after_attempt, wait_fixed import httpx import asyncio def get_article_difficulty(article) -> int: """获取文章的难度值""" url = "http://qbank.yunzhixue.cn/api/article/analysis" data = {"body": article, "question": ""} try: response = requests.post(url, json=data) except Exception as e: log_err_e(e, msg="获取文章难度值;") return 0 if response.status_code == 200: difficult_value = response.json()['data']['difficult'] return difficult_value else: logger.error(f"错误状态码{response.status_code}") def find_interval(number) -> int: """ 判断一个数字属于哪个难度等级区间。31级是例外情况,需要排查 :param number: 要检查的数字。 :return: 返回包含该数字的区间,如果没有找到,则返回 None。 """ intervals = [(1, 200), (201, 250), (251, 300), (301, 350), (351, 400), (401, 450), (451, 550), (551, 650), (651, 750), (751, 850), (851, 950), (951, 1100), (1101, 1250), (1251, 1400), (1401, 1550), (1551, 1700), (1701, 1900), (1901, 2100), (2101, 2300), (2301, 2600), (2601, 2900), (2901, 3200), (3201, 3500), (3501, 3900), (3901, 4300), (4301, 4700), (4701, 5100), (5101, 5500), (5501, 5900), (5901, 6500), (6501, 99999)] for index, (start, end) in enumerate(intervals, start=1): if start <= number <= end: return index logger.error(f"文章难度判断不对:{number}") return 0 def merge_and_split(list1, list2): combined = list1 + list2 import random random.shuffle(combined) two_thirds = [] one_third = [] total_length = len(combined) if total_length > 15: two_thirds = combined[:15] one_third = combined[15:] else: two_thirds = combined one_third = [] return two_thirds, one_third class GetArticle: def __init__(self): self.m = MySQLUploader() self.client = httpx.AsyncClient( timeout=httpx.Timeout(180.0), limits=httpx.Limits( max_keepalive_connections=100, max_connections=1000, keepalive_expiry=90.0 ) ) self.callback_url_dict = {} self.real_ip_dict = {} self.demo_name = {} self.article_result = {} self.punctuation = [",", ".", "!", "?", ":", ";", '"', "–", "_", "-", "...", "......"] all_exchange_words.update(self.punctuation) self.exchange_data: dict[str, list] = {} self.read_spring_bamboo_exchange_table() def read_spring_bamboo_exchange_table(self): """变形是键,原型是值""" wb = load_workbook(r"data/春笋单词对照变形.xlsx", read_only=True, data_only=True) ws = wb.active for row in ws.values: prototype = row[0] exchange = row[1] if prototype not in self.exchange_data: self.exchange_data[prototype] = [exchange] else: self.exchange_data[prototype].append(exchange) wb.close() async def parser_insert_to_mysql(self, resp_result): try: for single_article in resp_result['articles']: article = single_article['body'] article_json = json.dumps(single_article) difficult_value = find_interval(get_article_difficulty(article)) if not difficult_value: logger.error("文章难度等级为0;") sql = "INSERT INTO spring_bamboo_article (article_json,difficult_level) VALUES (%s,%s)" self.m.execute_(sql, (article_json, difficult_value)) except Exception as e: logger.error(f"插入数据库时发生错误: {str(e)}") raise async def submit_task(self, real_ip: str, core_words: list, take_count: int, demo_name: str, reading_level: int, article_length: int, exercise_id: int): """ core_words: 词义数据组 take_count: 取文章数量 (int类型,正常是2篇,最大8篇) demo_name: 项目名称 reading_level:阅读等级 article_length:文章长度 exercise_id:学案id """ task_id = randint(10000000, 99999999) logger.info(f"reading-comprehension 生成文章id。学案id:{exercise_id},task_id:{task_id}") try: self.real_ip_dict[task_id] = real_ip self.demo_name[task_id] = demo_name resp_result = await self.run_task(core_words, task_id, take_count, reading_level, article_length) await self.parser_insert_to_mysql(resp_result) logger.success(f"reading-comprehension 文章2任务完成。学案id:{exercise_id},taskid:{task_id}\n{resp_result}") return resp_result except Exception as e: err_msg = f"GetArticle提交任务失败{type(e).__name__},{e}" log_err_e(e, msg="GetArticle提交任务失败;") return err_msg @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), reraise=True) async def get_article(self, core_words: list, task_id: int, reading_level, article_length) -> dict: if not article_length: if 0 < reading_level <= 10: article_length = 50 + 10 * reading_level elif 10 < reading_level <= 20: article_length = 150 + 30 * (reading_level - 10) else: article_length = 450 + 20 * (reading_level - 20) for index, (start, end) in enumerate([(1, 8), (9, 16), (17, 24), (24, 30)], start=1): if start <= reading_level <= end: difficulty_control_stage = index break else: difficulty_control_stage = 2 diffculty_control = { 1: {"grade": "小学", "desc_difficulty": "最简单最容易没有难度", "paragraph_count": "1-2", "desc2": "文章整体非常简洁,通俗易懂,适合初学者,刚入门,单词全是最常见的,语句通顺即可。", "choice_desc": "选择题难度尽可能简单,参考中国小学生水平"}, 2: {"grade": "初中", "desc_difficulty": "简单、常见、难度低", "paragraph_count": "2-3", "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。", "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"}, 3: {"grade": "初中", "desc_difficulty": "简单、常见、难度低", "paragraph_count": "2-3", "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。", "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"}, 4: {"grade": "高中", "desc_difficulty": "常见、高中难度的", "paragraph_count": "3-5", "desc2": "文章整体难度适中,大约和中国的高中生,中国CET-6,雅思6分这样的难度标准。", "choice_desc": "选择题难度偏难,要有迷惑性混淆性,答案不要出现直接在文中,4个选项要学生推理或逻辑判断,参考中国高中生水平,高考标准。"} } grade = diffculty_control[difficulty_control_stage]["grade"] select_diffculty = diffculty_control[difficulty_control_stage]["desc_difficulty"] select_paragraph_count = diffculty_control[difficulty_control_stage]["paragraph_count"] desc2 = diffculty_control[difficulty_control_stage]["desc2"] choice_desc = diffculty_control[difficulty_control_stage]["choice_desc"] shuffle(core_words) core_words_meaning_str = ";".join([str(i['meaning_id']) + ' ' + i["spell"] + ":" + i["meaning"] for i in core_words]) no_escape_code = r"\\n\\n" sys_prompt = "你是一个专业的英语老师,擅长根据用户提供的词汇生成对应的英语文章和中文翻译和4个配套选择题。" q = f"""下面我会为你提供一组数据,[单词组](里面包含词义id,英语单词,中文词义),请根据这些单词的中文词义,\ 生成一篇带中文翻译的考场英语文章,英语文章和中文翻译要有[标题]。注意这个单词有多个词义时,生成的英语文章一定要用提供的中文词义。并挑选一句复杂的句子和其中文翻译,放入difficultSentences。\ 英语文章,放入"englishArticle"中。中文翻译,放入"chineseArticle"中。最终文中使用到的单词id放入"usedMeanIds"中。\ 4个选择题,放入questions字段。questions结构下有4个选择题对象,其中trunk是[英语]问题文本,analysis是[中文]的问题分析,candidates是4个ABCD选项,内部有label是指选项序号A B C D ,text是[英语]选项文本,isRight是否正确答案1是正确0是错误。 要求: 1.必须用提供的这个词义的单词,其他单词使用{select_diffculty}的单词。{desc2}{choice_desc} 2.优先保证文章语句通顺,意思不要太生硬。不要为了使用特定的单词,造成文章语义前后不搭,允许不使用个别词义。 3.文章中使用提供单词,一定要和提供单词的中文词义匹配,尤其是一词多义时,务必使用提供单词的词义。必须要用提供单词的词义。如果用到的词义与提供单词词义不一致,请不要使用这个单词。 4.生成的文章要求{article_length}词左右,可以用{no_escape_code}字符分段,一般{select_paragraph_count}个段落左右。第一段是文章标题。 5.允许不使用[单词组]的个别单词,优先保证文章整体意思通顺连贯和故事完整。 6.注意回复字段的中英文,englishArticle是英文,chineseArticle是中文,其中trunk是英文,analysis是中文,text是英文。 提供[单词组]:{core_words_meaning_str}; """ try: real_ip = self.real_ip_dict[task_id] demo_name = self.demo_name[task_id] r_json = json.loads(await get_article_gpt_pydantic(q, temperature=0.9, real_ip=real_ip, demo_name=demo_name, model='gpt-4.1', check_fucn=CheckArticleResult.get_article_1, max_tokens=4000, sys_prompt=sys_prompt, client=self.client)) allWordAmount = 0 articleWordAmount = get_article_words_count(r_json["englishArticle"]) allWordAmount += articleWordAmount for i in r_json["questions"]: count_trunk = get_article_words_count(i["trunk"]) count_candidates = sum([get_article_words_count(ii["text"]) for ii in i["candidates"]]) allWordAmount += count_trunk allWordAmount += count_candidates usedMeanIds: list = r_json['usedMeanIds'] article_words = split_text_to_word(r_json['englishArticle']) for i in core_words: meaning_id = i.get('meaning_id', 0) if not meaning_id: continue word = i["spell"] if meaning_id not in usedMeanIds and word in self.exchange_data: words_exchanges_list = self.exchange_data[word] for exchange_word in words_exchanges_list: if exchange_word in article_words: usedMeanIds.append(meaning_id) break r_json["body"] = r_json.pop("englishArticle") r_json["chinese"] = r_json.pop("chineseArticle") for q in r_json['questions']: data = q['candidates'] shuffled_candidates = sample(data, len(data)) labels = ['A', 'B', 'C', 'D'] for index, candidate in enumerate(shuffled_candidates): candidate['label'] = labels[index] q['candidates'] = shuffled_candidates return {**r_json, "allWordAmount": allWordAmount, "articleWordAmount": articleWordAmount} except httpx.HTTPError as e: logger.error(f"HTTP请求错误: {str(e)}") raise except json.JSONDecodeError as e: logger.error(f"JSON解析错误: {str(e)}") raise except Exception as e: log_err_e(e, f"gpt生成文章回复其他错误.") raise async def run_get_article_task(self, core_words, task_id, take_count, reading_level, article_length) -> dict: """ :param core_words: 核心单词数据,优先级1;可能为空 :param task_id: 任务id :param take_count: 文章数量 :param reading_level:阅读等级 :param article_length:文章长度 :return: """ try: tasks = [] for i in range(take_count): tasks.append( self.get_article(core_words, task_id, reading_level, article_length)) results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): continue return {"articles": results} except Exception as e: logger.error(f"运行文章任务时发生错误: {str(e)}") raise async def run_task(self, core_words, task_id, take_count, reading_level, article_length): try: outside_json = await self.run_get_article_task(core_words, task_id, take_count, reading_level, article_length) return outside_json except Exception as e: log_err_e(e, msg="外层总任务捕获错误") finally: self.real_ip_dict.pop(task_id) self.demo_name.pop(task_id) async def cleanup(self): """清理所有资源""" try: if hasattr(self, 'client'): await self.client.aclose() self.real_ip_dict.clear() self.demo_name.clear() self.callback_url_dict.clear() self.article_result.clear() except Exception as e: logger.error(f"清理资源时发生错误: {str(e)}") raise def __del__(self): """析构函数,确保资源被正确释放""" if hasattr(self, 'client'): asyncio.create_task(self.cleanup())