123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- # -*- coding: utf-8 -*-
- import re
- import json
- from deepseek.ds_api import DS
- from tools.new_mysql import MySQLUploader
- from tools.loglog import logger, log_err_e
- from tools.thread_pool_manager import pool_executor
- from common.common_data import all_exchange_words
- from common.split_text import split_text_to_word
- from pydantic import BaseModel
- from cachetools import TTLCache
- from concurrent.futures import wait
- from random import randint, shuffle
- import json
- import requests
- def get_article_difficulty(article) -> int:
- """获取文章的难度值"""
- url = "http://qbank.yunzhixue.cn/api/article/analysis"
- data = {"body": article, "question": ""}
- try:
- response = requests.post(url, json=data)
- except Exception as e:
- log_err_e(e, msg="获取文章难度值;")
- return 0
- if response.status_code == 200:
- difficult_value = response.json()['data']['difficult']
- return difficult_value
- else:
- logger.error(f"错误状态码{response.status_code}")
- def find_interval(number):
- """
- 判断一个数字属于哪个难度等级区间。31级是例外情况,需要排查
- :param number: 要检查的数字。
- :return: 返回包含该数字的区间,如果没有找到,则返回 None。
- """
- intervals = [(1, 200), (201, 250), (251, 300), (301, 350), (351, 400), (401, 450), (451, 550), (551, 650), (651, 750), (751, 850),
- (851, 950),
- (951, 1100),
- (1101, 1250), (1251, 1400), (1401, 1550), (1551, 1700), (1701, 1900), (1901, 2100), (2101, 2300), (2301, 2600),
- (2601, 2900),
- (2901, 3200),
- (3201, 3500), (3501, 3900), (3901, 4300), (4301, 4700), (4701, 5100), (5101, 5500), (5501, 5900), (5901, 6500),
- (6501, 99999)]
- for index, (start, end) in enumerate(intervals, start=1):
- if start <= number <= end:
- return index
- logger.error(f"文章难度判断不对:{number}")
- return 0
- def parse_question(question_block):
- question_info = {}
- question_match = re.search(r'问题:\s*(.*)', question_block)
- if question_match:
- question_info['trunk'] = question_match.group(1).strip()
- analysis_match = re.search(r'解析:\s*(.*)', question_block)
- if analysis_match:
- question_info['analysis'] = analysis_match.group(1).strip()
- options_match = re.search(r'选项:(.*)', question_block)
- if options_match:
- options_text = options_match.group(1).strip()
- options_list = re.split(r'\s*[BCDA]\.\s*', options_text)[1:]
- candidates = []
- for i, option_text in enumerate(options_list, start=65):
- label = chr(i)
- text = option_text.strip()
- candidates.append({
- "label": label,
- "text": text,
- "isRight": 0
- })
- question_info['candidates'] = candidates
- answer_match = re.search(r'答案:([ABCD])', question_block)
- if answer_match and 'candidates' in question_info:
- correct_label = answer_match.group(1)
- for candidate in question_info['candidates']:
- if candidate['label'] == correct_label:
- candidate['isRight'] = 1
- return question_info
- class GetArticle:
- def __init__(self):
- self.m = MySQLUploader()
- self.ds = DS()
- self.callback_url_dict = {}
- self.real_ip_dict = {}
- self.demo_name = {}
- self.punctuation = [",", ".", "!", "?", ":", ";", '"', "–", "_", "-", "...", "......"]
- all_exchange_words.update(self.punctuation)
- def parser_insert_to_mysql(self, resp_result):
- for single_article in resp_result['articles']:
- article = single_article['body']
- article_json = json.dumps(single_article)
- difficult_value = find_interval(get_article_difficulty(article))
- if not difficult_value:
- logger.error("文章难度等级为0;")
- sql = "INSERT INTO spring_bamboo_article (article_json,difficult_level) VALUES (%s,%s)"
- self.m.execute_(sql, (article_json, difficult_value))
- def submit_task(self, words_meaning_list: list, take_count: int, student_stage: int, real_ip: str, demo_name: str):
- """
- words_meaning_ids: 词义id 包含词义ID的数组集合,用于生成文章。- 示例:[110, 111, 112, 113, 114]
- take_count: 取文章数量 (int类型,正常是2篇,最大8篇)
- student_stage: 学段(int类型:1.小学;2.初中;3.高中;)
- demo_name: 项目名称
- """
- task_id = randint(10000000, 99999999)
- words_meaning_str = ";".join([i["spell"] + ":" + i["meaning"] for i in words_meaning_list])
- logger.info(f"生成文章id。task_id:{task_id}。词义组:{words_meaning_str}.")
- self.real_ip_dict[task_id] = real_ip
- self.demo_name[task_id] = demo_name
- try:
- resp_result = self.run_task(words_meaning_list, task_id, take_count, student_stage)
- self.parser_insert_to_mysql(resp_result)
- return resp_result
- except Exception as e:
- err_msg = f"GetArticle提交任务失败{type(e).__name__},{e}"
- log_err_e(e, msg="GetArticle提交任务失败;")
- return err_msg
- def get_article(self, words_meaning_list, student_stage, task_id, take_count) -> dict:
- diffculty_control = {
- 1: {"grade": "小学", "article_word_count": 60, "desc_difficulty": "最简单最容易没有难度", "paragraph_count": 1,
- "desc2": "文章整体非常简洁,通俗易懂,适合初学者,刚入门,单词全是最常见的,语句通顺即可。",
- "choice_desc": "选择题难度尽可能简单,但是不要让所有选择题让其直接在文中找到答案,允许1-2个选择题很简单,参考中国小学生水平"},
- 2: {"grade": "初中", "article_word_count": 200, "desc_difficulty": "简单、常见、难度低", "paragraph_count": 3,
- "desc2": "文章整体难度适中,大约和中国初中生,中国CET-3,雅思4分这样的难度标准。",
- "choice_desc": "选择题难度适中,但是不要所有选择题让其直接在文中找到答案,参考中国初中生水平,中考标准。"},
- 3: {"grade": "高中", "article_word_count": 300, "desc_difficulty": "常见、高中难度的", "paragraph_count": 3,
- "desc2": "文章整体难度适中,大约和中国的高中生,中国CET-4,雅思5分这样的难度标准。",
- "choice_desc": "选择题难度偏难,要有迷惑性,不要出现直接在文中找到答案,参考中国高中生水平,高考标准。"}
- }
- take_count_dict = {0: "", 1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "七", 8: "八"}
- different_cou = take_count_dict.get(take_count, "")
- grade = diffculty_control[student_stage]["grade"]
- select_word_count = diffculty_control[student_stage]["article_word_count"]
- select_diffculty = diffculty_control[student_stage]["desc_difficulty"]
- select_paragraph_count = diffculty_control[student_stage]["paragraph_count"]
- desc2 = diffculty_control[student_stage]["desc2"]
- choice_desc = diffculty_control[student_stage]["choice_desc"]
- shuffle(words_meaning_list)
- words_meaning_str = ";".join([i["spell"] + ":" + i["meaning"] for i in words_meaning_list])
- q = f"""不要与前面{different_cou}篇一样,要不同的场景。你是一名在中国的英语教师,下面我会为你提供一些带中文词义的英语单词,请根据这些单词的中文词义,\
- 生成带英文标题和中文翻译的考场英语文章,注意这个单词有多个词义时,生成的英语文章一定要用提供的中文词义。并挑选一句复杂的句子和其中文翻译,放入difficultSentences。
- 提供单词:{words_meaning_str}
- 要求:
- 1.必须用提供的这个词义的单词,其他单词使用{select_diffculty}的单词。{desc2}
- 2.文章中使用提供单词,一定要和提供单词的中文词义匹配,尤其是一词多义时,务必使用提供单词的词义。必须要用提供单词的词义。如果用到的词义与提供单词词义不一致,请不要使用这个单词。
- 3.生成的文章要求{select_word_count}词左右,可以用\\n\\n字符分段,一般{select_paragraph_count}个段落左右。
- 4.优先保证文章语句通顺,意思不要太生硬。不要为了使用特定的单词,造成文章语义前后不搭,允许不使用个别词义。
- 5.回复json,格式:{{"title":英文标题,"english":英语文章,"chinese":中文翻译,"difficultSentences": [
- {{
- "english": "",
- "chinese": ""
- }}
- ]}}
- """
- try:
- real_ip = self.real_ip_dict[task_id]
- demo_name = self.demo_name[task_id]
- r_json = json.loads(
- self.ds.get_article(q, temperature=1, json_resp=True, real_ip=real_ip, demo_name=demo_name, max_tokens=8000))
- r_json["body"] = r_json["title"] + "\n\n" + r_json["english"]
- del r_json["title"]
- q_choice_question = f"""你是一名在中国的{grade}英语教师,下面我会为你提供一篇英语短文,请根据短文设计4个选择题.
- 短文:{r_json["english"]}
- ###要求:
- 1. 生成的选择题不要让学生从短文中直接找到答案,可以混淆,最好让学生推理或排除获得正确答案。用词可以{select_diffculty},出题要参考中国的中考高考。
- 2.{choice_desc}
- 3. 每个选择题之间间隔两行,回复格式如下:
- 问题: 英语的选择题问题1文本;
- 解析: 中文的选择题答案解析;
- 选项:A. 选项1 B. 选项2 C. 选项3 D. 选项4
- 答案:B
- 其他几个选择题依次类推
- """
- resp_text = self.ds.get_article(q_choice_question, temperature=1, real_ip=real_ip, demo_name=demo_name, max_tokens=8000)
- questions = resp_text.strip().split('\n\n')
- parsed_questions = [parse_question(q) for q in questions]
- json_data = {"questions": parsed_questions}
- allWordAmount = 0
- allWordAmount += len(split_text_to_word(r_json["english"]))
- for i in json_data["questions"]:
- count_trunk = len(split_text_to_word(i["trunk"]))
- count_candidates = sum([len(split_text_to_word(ii["text"])) for ii in i["candidates"]])
- allWordAmount += count_trunk
- allWordAmount += count_candidates
- return {**r_json, **json_data, "allWordAmount": allWordAmount}
- except json.decoder.JSONDecodeError:
- logger.error("gpt生成文章回复json格式化错误")
- except Exception as e:
- logger.error(f"gpt生成文章回复其他错误.{type(e).__name__} {e}")
- def run_get_article_task(self, words_meaning_list, task_id, take_count, student_stage) -> dict:
- """
- :param words_meaning_list: 数据库内查出来的单词和词义的列表
- :param task_id: 任务id
- :param take_count: 文章数量
- :param student_stage: 学段标识,整型
- :return:
- """
- futures = []
- for i in range(take_count):
- futures.append(pool_executor.submit(self.get_article, words_meaning_list, student_stage, task_id, take_count))
- wait(futures)
- return_json = {"articles": []}
- for t in futures:
- return_json["articles"].append(t.result())
- return return_json
- def run_task(self, words_meaning_list, task_id, take_count, student_stage):
- try:
- outside_json = self.run_get_article_task(words_meaning_list, task_id, take_count, student_stage)
- logger.success(f"文章3-DeepSeek文章任务完成。taskid:{task_id}")
- return outside_json
- except Exception as e:
- logger.error(f"{type(e).__name__} {e}")
- finally:
- self.real_ip_dict.pop(task_id)
- self.demo_name.pop(task_id)
|