# -*- coding: utf-8 -*- from gpt.chatgpt import get_answer_from_gpt,get_article_gpt_pydantic from gpt.gpt_check import CheckGptAnswer, CheckArticleResult from tools.new_mysql import MySQLUploader from tools.loglog import logger, log_err_e from tools.thread_pool_manager import pool_executor from common.common_data import all_exchange_words from common.split_text import split_text_to_word from pydantic import BaseModel from cachetools import TTLCache from concurrent.futures import wait from random import randint, shuffle import json import requests from openpyxl import load_workbook from tenacity import retry, stop_after_attempt, wait_fixed def get_article_difficulty(article) -> int: """获取文章的难度值""" url = "http://qbank.yunzhixue.cn/api/article/analysis" data = {"body": article, "question": ""} try: response = requests.post(url, json=data) except Exception as e: log_err_e(e, msg="获取文章难度值;") return 0 if response.status_code == 200: difficult_value = response.json()['data']['difficult'] return difficult_value else: logger.error(f"错误状态码{response.status_code}") def find_interval(number) -> int: """ 判断一个数字属于哪个难度等级区间。31级是例外情况,需要排查 :param number: 要检查的数字。 :return: 返回包含该数字的区间,如果没有找到,则返回 None。 """ intervals = [(1, 200), (201, 250), (251, 300), (301, 350), (351, 400), (401, 450), (451, 550), (551, 650), (651, 750), (751, 850), (851, 950), (951, 1100), (1101, 1250), (1251, 1400), (1401, 1550), (1551, 1700), (1701, 1900), (1901, 2100), (2101, 2300), (2301, 2600), (2601, 2900), (2901, 3200), (3201, 3500), (3501, 3900), (3901, 4300), (4301, 4700), (4701, 5100), (5101, 5500), (5501, 5900), (5901, 6500), (6501, 99999)] for index, (start, end) in enumerate(intervals, start=1): if start <= number <= end: return index logger.error(f"文章难度判断不对:{number}") return 0 def merge_and_split(list1, list2): combined = list1 + list2 import random random.shuffle(combined) two_thirds = [] one_third = [] total_length = len(combined) if total_length > 15: two_thirds = combined[:15] one_third = combined[15:] else: two_thirds = combined one_third = [] return two_thirds, one_third class GetArticle: def __init__(self): self.m = MySQLUploader() self.callback_url_dict = {} self.real_ip_dict = {} self.demo_name = {} self.article_result = {} self.punctuation = [",", ".", "!", "?", ":", ";", '"', "–", "_", "-", "...", "......"] all_exchange_words.update(self.punctuation) self.exchange_data: dict[str, list] = {} self.read_spring_bamboo_exchange_table() def read_spring_bamboo_exchange_table(self): """变形是键,原型是值""" wb = load_workbook(r"data/春笋单词对照变形.xlsx", read_only=True, data_only=True) ws = wb.active for row in ws.values: prototype = row[0] exchange = row[1] if prototype not in self.exchange_data: self.exchange_data[prototype] = [exchange] else: self.exchange_data[prototype].append(exchange) wb.close() def parser_insert_to_mysql(self, resp_result): for single_article in resp_result['articles']: article = single_article['body'] article_json = json.dumps(single_article) difficult_value = find_interval(get_article_difficulty(article)) if not difficult_value: logger.error("文章难度等级为0;") sql = "INSERT INTO spring_bamboo_article (article_json,difficult_level) VALUES (%s,%s)" self.m.execute_(sql, (article_json, difficult_value)) def submit_task(self, core_words: list, extend_words: list, take_count: int, student_stage: int, real_ip: str, demo_name: str, article_difficulty: int): """ words_meaning_list: 词义id 包含词义ID的数组集合,用于生成文章。- 示例:[110, 111, 112, 113, 114] take_count: 取文章数量 (int类型,正常是2篇,最大8篇) student_stage: 学段(int类型:1.小学;2.初中;3.高中;) demo_name: 项目名称 article_difficulty:文章难度值1-4200模糊范围 """ task_id = randint(10000000, 99999999) logger.info(f"生成文章id。task_id:{task_id}") self.real_ip_dict[task_id] = real_ip self.demo_name[task_id] = demo_name try: resp_result = self.run_task(core_words, extend_words, task_id, take_count, student_stage, article_difficulty) self.parser_insert_to_mysql(resp_result) return resp_result except Exception as e: err_msg = f"GetArticle提交任务失败{type(e).__name__},{e}" log_err_e(e, msg="GetArticle提交任务失败;") return err_msg @retry(stop=stop_after_attempt(2), wait=wait_fixed(3), reraise=True) def get_article(self, core_words: list, extend_words: list, student_stage: int, task_id: int, take_count: int, article_difficulty) -> dict: article_grade = find_interval(article_difficulty) if 0 < article_grade <= 10: article_word_count = 50 + 10 * article_grade elif 10 < article_grade <= 20: article_word_count = 150 + 30 * (article_grade - 10) else: article_word_count = 450 + 20 * (article_grade - 20) diffculty_control = { 1: {"grade": "小学", "article_word_count": article_word_count, "desc_difficulty": "最简单最容易没有难度", "paragraph_count": "1-2", "desc2": "文章整体非常简洁,通俗易懂,适合初学者,刚入门,单词全是最常见的,语句通顺即可。", "choice_desc": "选择题难度尽可能简单,但是不要让所有选择题让其直接在文中找到答案,允许1-2个选择题很简单,参考中国小学生水平"}, 2: {"grade": "初中", "article_word_count": article_word_count, "desc_difficulty": "简单、常见、难度低", "paragraph_count": "2-3", "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。", "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"}, 3: {"grade": "初中", "article_word_count": article_word_count, "desc_difficulty": "简单、常见、难度低", "paragraph_count": "2-3", "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。", "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"}, 4: {"grade": "高中", "article_word_count": article_word_count, "desc_difficulty": "常见、高中难度的", "paragraph_count": "3-5", "desc2": "文章整体难度适中,大约和中国的高中生,中国CET-6,雅思6分这样的难度标准。", "choice_desc": "选择题难度偏难,要有迷惑性混淆性,答案不要出现直接在文中,4个选项要学生推理或逻辑判断,参考中国高中生水平,高考标准。"} } take_count_dict = {0: "", 1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "七", 8: "八", 9: "九"} different_cou = take_count_dict.get(take_count, "") grade = diffculty_control[student_stage]["grade"] select_word_count = diffculty_control[student_stage]["article_word_count"] select_diffculty = diffculty_control[student_stage]["desc_difficulty"] select_paragraph_count = diffculty_control[student_stage]["paragraph_count"] desc2 = diffculty_control[student_stage]["desc2"] choice_desc = diffculty_control[student_stage]["choice_desc"] shuffle(core_words) core_words_meaning_str = ";".join([str(i['meaning_id']) + ' ' + i["spell"] + ":" + i["meaning"] for i in core_words]) extend_words_meaning_str = ";".join([str(i['meaning_id']) + ' ' + i["spell"] + ":" + i["meaning"] for i in extend_words]) no_escape_code = r"\\n\\n" sys_prompt = "你是一个专业的英语老师,擅长根据用户提供的词汇生成对应的英语文章和中文翻译和4个配套选择题。" q = f"""下面我会为你提供两组数据,[单词组1]和[单词组2](里面包含词义id,英语单词,中文词义),优先使用[单词组1]内的单词,请根据这些单词的中文词义,\ 生成一篇带中文翻译的考场英语文章,英语文章和中文翻译要有[标题]。注意这个单词有多个词义时,生成的英语文章一定要用提供的中文词义。并挑选一句复杂的句子和其中文翻译,放入difficultSentences。\ 英语文章,放入"englishArticle"中。中文翻译,放入"chineseArticle"中。最终文中使用到的单词id放入"usedMeanIds"中。\ 4个选择题,放入questions字段。questions结构下有4个选择题对象,其中trunk是[英语]问题文本,analysis是[中文]的问题分析,candidates是4个ABCD选项,内部有label是指选项序号A B C D ,text是[英语]选项文本,isRight是否正确答案1是正确0是错误。 要求: 1.必须用提供的这个词义的单词,其他单词使用{select_diffculty}的单词。{desc2}{choice_desc} 2.优先保证文章语句通顺,意思不要太生硬。不要为了使用特定的单词,造成文章语义前后不搭,允许不使用个别词义。 3.文章中使用提供单词,一定要和提供单词的中文词义匹配,尤其是一词多义时,务必使用提供单词的词义。必须要用提供单词的词义。如果用到的词义与提供单词词义不一致,请不要使用这个单词。 4.生成的文章要求{select_word_count}词左右,可以用{no_escape_code}字符分段,一般{select_paragraph_count}个段落左右。第一段是文章标题。 5.生成文章优先使用[单词组1]的词义,其次可以挑选使用[单词组2]的词义。允许不使用[单词组1]的个别单词,优先保证文章整体意思通顺连贯和故事完整。 提供[单词组1]:{core_words_meaning_str}; 提供[单词组2]:{extend_words_meaning_str}; """ try: real_ip = self.real_ip_dict[task_id] demo_name = self.demo_name[task_id] r_json = json.loads(get_article_gpt_pydantic(q, temperature=0.9, real_ip=real_ip, demo_name=demo_name, model='gpt-4.1', max_tokens=4000, sys_prompt=sys_prompt)) allWordAmount = 0 allWordAmount += len(split_text_to_word(r_json["englishArticle"])) for i in r_json["questions"]: count_trunk = len(split_text_to_word(i["trunk"])) count_candidates = sum([len(split_text_to_word(ii["text"])) for ii in i["candidates"]]) allWordAmount += count_trunk allWordAmount += count_candidates usedMeanIds: list = r_json['usedMeanIds'] article_words = split_text_to_word(r_json['englishArticle']) for i in core_words + extend_words: meaning_id = i.get('meaning_id', 0) if not meaning_id: continue word = i["spell"] if meaning_id not in usedMeanIds and word in self.exchange_data: words_exchanges_list = self.exchange_data[word] for exchange_word in words_exchanges_list: if exchange_word in article_words: usedMeanIds.append(meaning_id) break r_json["body"] = r_json.pop("englishArticle") r_json["chinese"] = r_json.pop("chineseArticle") return {**r_json, "allWordAmount": allWordAmount} except json.decoder.JSONDecodeError: logger.error("gpt生成文章回复json格式化错误") raise except Exception as e: logger.error(f"gpt生成文章回复其他错误.{type(e).__name__} {e}") raise def run_get_article_task(self, core_words, extend_words, task_id, take_count, student_stage, article_difficulty) -> dict: """ :param core_words: 核心单词数据,优先级1;可能为空 :param extend_words: 扩展单词数据,优先级2;可能为空 :param task_id: 任务id :param take_count: 文章数量 :param student_stage: 学段标识,整型,123 :param article_difficulty:文章难度1-4200模糊范围 :return: """ futures = [] for i in range(take_count): futures.append(pool_executor.submit(self.get_article, core_words, extend_words, student_stage, task_id, take_count, article_difficulty)) wait(futures) return_json = {"articles": []} for t in futures: return_json["articles"].append(t.result()) return return_json def run_task(self, core_words, extend_words, task_id, take_count, student_stage, article_difficulty): try: outside_json = self.run_get_article_task(core_words, extend_words, task_id, take_count, student_stage, article_difficulty) logger.success(f"文章2任务完成。taskid:{task_id}\n{outside_json}") return outside_json except Exception as e: logger.error(f"{type(e).__name__} {e}") finally: self.real_ip_dict.pop(task_id) self.demo_name.pop(task_id)