123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- # -*- coding: utf-8 -*-
- import random
- from gpt.chatgpt import get_answer_from_gpt
- from tools.new_mysql import MySQLUploader
- from tools.loglog import logger
- from tools.thread_pool_manager import pool_executor
- from common.common_data import all_exchange_words
- from common.split_text import *
- from data.get_all_exchange_words import get_word_exchange_list, word_to_prototype
- import requests
- import oss2
- from oss2.credentials import EnvironmentVariableCredentialsProvider
- from collections import OrderedDict
- from cachetools import TTLCache
- from concurrent.futures import Future, wait
- from random import randint
- import re
- import json
- import time
- import traceback
- class OtherBaseFunction:
- def __init__(self):
- self.m = MySQLUploader()
- self.fake_meaningid = {}
- self.callback_url_dict = {}
- self.real_ip_dict = {}
- self.demo_name = {}
- self.query_cache_wordspelling = TTLCache(maxsize=2000, ttl=86400)
- self.query_cache_meaningid = TTLCache(maxsize=2000, ttl=86400)
- @staticmethod
- def _diffculty_control(student_stage, vocabulary) -> dict:
- """
- 根据学生学段或其词汇量,给与不同的难度控制
- :param student_stage: 学生学段,123,小学初中高中
- :param vocabulary: 学生的词汇量,1200小学,2400初中,4800高中
- :return:
- """
- if vocabulary <= 1200:
- difficult_control = {"difficult_desc": "最简单最基础的入门的初级的幼儿园的毫无难度的", "paragraph_count": 1,
- "student_stage_str": "小学",
- "pragrapg_count": "生成的文章要求100词左右,三个段落以上。允许有简单句式的出现。"}
- elif 1200 < vocabulary <= 2400:
- difficult_control = {"difficult_desc": "简单的容易的常见的难度低的", "paragraph_count": 3, "student_stage_str": "初中",
- "pragrapg_count": r"生成的文章要求150词左右,三个段落以上。用\n\n分段。"}
- else:
- difficult_control = {"difficult_desc": "常见的初级的中国高考的", "paragraph_count": 5, "student_stage_str": "高中",
- "pragrapg_count": r"生成的文章要求250词左右,允许有3-5个段落。用\n\n分段。"}
- return difficult_control
- def _get_article_chinese_dict(self, title, r_article_sentences, task_id):
- """
- 获取文章的中文翻译。注意:这里切割的方法要与后面的split_article_make_json一致
- :param title: 标题
- :param r_article_sentences: 通过生词检验的文章句子列表
- :return:
- """
- def get_chinese_from_gpt(whole_article_sentences: list):
- q = f"""你是一名在中国的英语教师,下面我会为你提供一个英语句子的列表,请按列表顺序将每个句子翻译成中文,结果按列表顺序放在chinese为键的json数组内。
- 英语句子列表:{whole_article_sentences}
- 要求:
- 1.中文翻译的结果要按列表的顺序,依次放入sentence数组。回复的中文数量要与英语句子列表的数量一样,不要漏下。
- 2.回复json,格式:{{"chinese":[sentence1,sentence2...]}}
- """
- real_ip = self.real_ip_dict[task_id]
- demo_name = self.demo_name[task_id]
- for cou in range(3):
- try:
- r_json = json.loads(get_answer_from_gpt(q, temperature=0.8, json_resp=True, real_ip=real_ip, demo_name=demo_name))
- r_article_chinese_list = r_json.get("chinese")
- if len(r_article_chinese_list) == len(whole_article_sentences):
- r_article_chinese_dict = {k: str(v) for k, v in zip(whole_article_sentences, r_article_chinese_list)}
- return r_article_chinese_dict
- logger.warning(f"警告:第{cou + 1}次,中文翻译与原句数量不一致")
- except json.decoder.JSONDecodeError:
- logger.error("gpt生成文章中文翻译,回复json格式化错误")
- except Exception as e:
- logger.error(f"gpt生成文章中文翻译回复其他错误.{type(e).__name__} {e}")
- logger.critical("严重错误:gpt生成文章中文翻译三次全错,请管理员检查")
- article_list = [title + "\n\n"] + r_article_sentences
- r_article_chinese_dict = get_chinese_from_gpt(whole_article_sentences=article_list)
- if r_article_chinese_dict:
- return r_article_chinese_dict
- @staticmethod
- def _calculate_new_word_rate(r_article_sentences):
- article = "".join(r_article_sentences)
- new_words = set()
- test_article = re.findall(r'\b\w+\'?\w*\b', article)
- for word in test_article:
- word2: str = word.split("'")[0] if "'" in word else word
- if len(word) <= 2:
- continue
- is_in_12000words = any([word2.lower() in all_exchange_words, word2.title() in all_exchange_words])
- if not is_in_12000words:
- new_words.add(word)
- new_word_rate = round(len(new_words) / len(article), 3)
- logger.info(f"开发调试生词率{new_word_rate}.生词{new_words}")
- new_words = list(new_words)
- return new_word_rate, new_words
- def insert_article_to_mysql(self, title, article, chinese, task_id, code=0):
- self.m.execute_("INSERT INTO new_word_article (title,article,chinese, taskId,code) VALUES (%s, %s,%s,%s,%s)",
- (title, article, chinese, task_id, code))
- def get_wordid_by_wordspelling(self, wordspelling: str):
- """加一个功能。大字典内没有这个单词就自动插入,返回id"""
- if wordspelling in self.query_cache_meaningid:
- return self.query_cache_wordspelling[wordspelling]
- s = "select Id from dictionary_word where wordspelling = %s"
- prototype_word = word_to_prototype(wordspelling)
- r = self.m.query_data(s, (prototype_word,))
- if r:
- wordid = r[0][0]
- else:
- wordid = 0
- self.query_cache_wordspelling[wordspelling] = wordid
- return wordid
- def get_meaning_by_meaningid(self, meaningid: int):
- """加一个功能。大字典内没有这个单词就自动插入,返回id"""
- if meaningid in self.query_cache_meaningid:
- return self.query_cache_meaningid[meaningid]
- s = "select WordMeaning from dictionary_meaningitem where Id = %s"
- r = self.m.query_data(s, (meaningid,))
- meaning = r[0][0] if r else ""
- self.query_cache_meaningid[meaningid] = meaning
- return meaning
- def _get_fake_meaningid(self, word):
- """获得假词义id。但是保证同一个单词是一个id"""
- if word in self.fake_meaningid:
- return self.fake_meaningid[word]
- s = "select Id from dictionary_meaningitem where WordSpelling = %s"
- r = self.m.query_data(s, (word,))
- if r:
- fake_meaningid = r[0][0]
- else:
- fake_meaningid = random.randint(10000, 99999)
- self.fake_meaningid[word] = fake_meaningid
- return fake_meaningid
- @staticmethod
- def _clean_gpt_res(single_sentence: str, gpt_text: str, split_words: list) -> list:
- """# 解析成 键是句子+单词拼写,值是词义id"""
- return_data = []
- if not gpt_text:
- return []
- row_data = [i for i in gpt_text.split("\n") if "**" in i]
- already_spelling = set()
- for row in row_data:
- one_row_data_list = row.split("**")
- if len(one_row_data_list) < 1:
- continue
- one_row_data_list = [i.strip() for i in one_row_data_list]
- spelling, meaning_id = one_row_data_list[0:2]
- already_spelling.add(spelling)
- return_data.append([single_sentence, spelling, int(meaning_id)])
- for remaining_word in set(split_words).difference(already_spelling):
- return_data.append([single_sentence, remaining_word, 0])
- return return_data
- class GetArticle(OtherBaseFunction):
- def __init__(self):
- super().__init__()
- self.auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
- self.bucket = oss2.Bucket(self.auth, 'oss-cn-hangzhou.aliyuncs.com', 'qingti-private')
- self.article_result = {}
- self.punctuation = [",", ".", "!", "?", ":", ";", '"', "–", "_", "-", "...", "......"]
- all_exchange_words.update(self.punctuation)
- def __del__(self):
- ...
- def submit_task(self, words_meaning_ids: list[int], callback_url: str, real_ip: str, demo_name: str,
- student_stage: int, vocabulary: int, class_id: int):
- """
- words_meaning_ids: 词义id 包含词义ID的数组集合,用于生成文章。- 示例:[110, 111, 112, 113, 114]
- callback_url: 通知的回调地址
- demo_name: 项目名称
- student_stage: 学生学段,123
- vocabulary: 学生词汇量500
- """
- task_id = randint(10000000, 99999999)
- logger.info(f"生成文章id。task_id:{task_id}。词义id:{words_meaning_ids}.")
- self.callback_url_dict[task_id] = callback_url
- self.real_ip_dict[task_id] = real_ip
- self.demo_name[task_id] = demo_name
- words_meaning_str = ""
- for wordmeaning_id in words_meaning_ids:
- r = self.m.query_data("select WordSpelling,WordMeaning from dictionary_meaningitem where Id = %s", (wordmeaning_id,))
- try:
- words_meaning_str += str(r[0])
- except IndexError:
- err_msg = f"文章生成任务提交失败。task_id:{task_id},词义表内没有这个词义id:{wordmeaning_id}"
- logger.error(err_msg)
- return err_msg
- try:
- pool_executor.submit(self.run_task, words_meaning_str, task_id, student_stage, vocabulary, class_id)
- resp_result = {"id": task_id, "key": f"study/article/{task_id}"}
- logger.success(f"文章生成任务提交成功:{resp_result}")
- return resp_result
- except Exception as e:
- err_msg = f"GetArticle提交任务失败{type(e).__name__},{e}"
- logger.error(err_msg)
- return err_msg
- def __get_article(self, words_meaning_str, task_id, student_stage, vocabulary) -> tuple:
- dc = self._diffculty_control(student_stage, vocabulary)
- q = f"""你是一名在中国的英语教师,下面我会为你提供一些带中文词义的英语种子单词,请根据这些种子单词的词义,生成一篇带标题的英语文章。
- 提供种子单词:{words_meaning_str}
- 要求:
- 1.必须用提供的这个词义的单词,文章的其他单词使用{dc["difficult_desc"]}单词。
- 2.文章应以自然、母语水平的英语撰写。请仅使用与种子单词难度相同或更简单的词汇,避免使用更高级的词汇和复杂的句子结构。请使用常用的高频英语词汇,避免使用不常见或专业的词汇。种子单词可以在文章中任意位置出现,不限制顺序。
- 2.{dc["paragraph_count"]},为确保词汇难度符合要求,请仅使用 **中国教育部{dc['student_stage_str']}英语词汇表** 中的单词。
- 3.请将文章返回一个一个带标点的句子,放在article_sentences里面的数组里。如果有分段,必须请在句子后面加\\n\\n。
- 4.回复json,格式:{{"title":标题,"article_sentences":[句子1,句子2]}}
- """
- try:
- real_ip = self.real_ip_dict[task_id]
- demo_name = self.demo_name[task_id]
- r_json = json.loads(get_answer_from_gpt(q, temperature=0.8, json_resp=True, real_ip=real_ip, demo_name=demo_name))
- r_article_sentences = r_json.get("article_sentences")
- r_title = r_json.get("title")
- return r_title, r_article_sentences
- except json.decoder.JSONDecodeError:
- logger.error("gpt生成文章回复json格式化错误")
- except Exception as e:
- logger.error(f"gpt生成文章回复其他错误.{type(e).__name__} {e}")
- def __replace_new_word(self, old_article: str, new_words: list, task_id: int):
- new_words_str = ",".join(new_words)
- q = f"""你是一名在中国的英语教师,下面我会为你提供一篇英语文章和一些生词,请用其他单词使用简单、常见、难度低的单词将英语文章中的生词进行替换。
- 缩写引号用单引号'。最终回复替换后的英语文章。
- 英语文章:{old_article}
- 生词:{new_words_str}
- 要求:
- 1.替换掉所有生词,替换单词使用简单、常见、难度低的单词。
- 2.生成的文章要求150词左右,可以分段。
- 3.回复json,格式:{{"title":标题,"article":英语文章}}
- """
- try:
- real_ip = self.real_ip_dict[task_id]
- demo_name = self.demo_name[task_id]
- r_json = json.loads(get_answer_from_gpt(q, temperature=0.8, json_resp=True, real_ip=real_ip, demo_name=demo_name))
- print(f"调试信息2 {r_json}")
- r_article = r_json.get("article")
- r_title = r_json.get("title")
- return r_title, r_article
- except json.decoder.JSONDecodeError:
- logger.error("gpt替换生词文章回复json格式化错误")
- except Exception as e:
- logger.error(f"gpt替换生词文章回复其他错误.{type(e).__name__} {e}")
- def run_get_article_task(self, words_meaning_str, task_id, student_stage, vocabulary) -> tuple:
- """
- :param vocabulary:
- :param student_stage:
- :param words_meaning_str: 数据库内查出来的单词和词义的拼接字符串
- :param task_id: 文章任务id
- :return: 标题,文章,句子翻译的字典
- """
- def get_article_chinese(title, r_article_sentences, task_id, code=0) -> tuple:
- r_article_chinese_dict = self._get_article_chinese_dict(title, r_article_sentences, task_id)
- chinese_str = "\n".join(r_article_chinese_dict.values())
- r_article = "".join(r_article_sentences)
- self.insert_article_to_mysql(title=r_title, article=r_article, chinese=chinese_str, task_id=task_id, code=code)
- return r_title, r_article_sentences, r_article_chinese_dict
- r_title, r_article_sentences = self.__get_article(words_meaning_str, task_id, student_stage, vocabulary)
- new_word_rate, new_words = self._calculate_new_word_rate(r_article_sentences)
- if new_word_rate < 0.03:
- return get_article_chinese(title=r_title, r_article_sentences=r_article_sentences, task_id=task_id)
- replace_article_gpt = "".join(r_article_sentences)
- for i in range(3):
- if tuple_data := self.__replace_new_word(old_article=replace_article_gpt, new_words=new_words, task_id=task_id):
- r_title, replace_article_gpt = tuple_data
- new_word_rate, new_words = self._calculate_new_word_rate(replace_article_gpt)
- if new_word_rate < 0.03 or i == 2:
- if i == 2:
- logger.warning(f"3次后生词率未到3%以下。task_id:{task_id}")
- return get_article_chinese(title=r_title, r_article_sentences=r_article_sentences, task_id=task_id)
- def split_article_make_json(self, task_id: int, title: str, r_article_sentences: list, r_article_chinese_dict: dict):
- article = "".join(r_article_sentences)
- article = title + "\n\n" + article
- all_sentence_word_meaningid_dict = self.run_query_word_meaning(article, task_id)
- word_count = get_article_words_count(title + article)
- create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- outside_json_dict = {"id": task_id, "body": article, "wordCount": word_count, "paragraphs": [],
- "createTime": create_time}
- article_paragraphs = article.split("\n\n")
- article_sentence_count = 0
- for paragraph in article_paragraphs:
- sentences = split_text_to_sentences(paragraph)
- p = {"sentences": []}
- for single_sentence in sentences:
- article_sentence_count += 1
- single_sentence_chinese = r_article_chinese_dict.get(single_sentence, "")
- w = {"words": [], "chinese": single_sentence_chinese}
- split_words: list[str] = re.findall(r'\b[-\'\w]+\b|[^\w\s]', single_sentence)
- for originale_word in split_words:
- single_word = originale_word
- if not originale_word:
- continue
- if not re.search(r'[a-zA-Z]', originale_word):
- w["words"].append({"spell": originale_word, "type": "punctuation"})
- continue
- word_id = self.get_wordid_by_wordspelling(originale_word)
- x_data, type_ = all_sentence_word_meaningid_dict.get(single_sentence + originale_word, [0, 0])
- if type_ == 0:
- single_word = originale_word.lower()
- x_data, type_ = all_sentence_word_meaningid_dict.get(single_sentence + single_word, [0, 0])
- if type_ == 0:
- single_word = word_to_prototype(single_word)
- x_data, type_ = all_sentence_word_meaningid_dict.get(single_sentence + single_word, [0, 0])
- if type_ == 0:
- logger.warning(f"警告:type_还是0,那就是二次查询时,也没有给词义。有漏下的单词{originale_word}")
- continue
- if type_ == 1:
- meaning_id = x_data
- meaning = self.get_meaning_by_meaningid(x_data)
- elif type_ == 2:
- meaning_id = self._get_fake_meaningid(single_word)
- meaning = x_data
- else:
- logger.error(f"出错:未知的type_:{type_}")
- meaning_id = 9999999
- meaning = '无'
- word_prototype = word_to_prototype(originale_word)
- word_json = {"id": word_id, "meaningId": meaning_id, "meaning": meaning, "spell": originale_word,
- "exchanges": get_word_exchange_list(word=single_word), "prototype": word_prototype}
- w["words"].append(word_json)
- p["sentences"].append(w)
- outside_json_dict["paragraphs"].append(p)
- outside_json_dict["articleSentenceCount"] = article_sentence_count
- return outside_json_dict, word_count, article_sentence_count
- def run_query_word_meaning(self, article, task_id):
- futures = []
- article_paragraphs = article.split("\n\n")
- for paragraph in article_paragraphs:
- sentences = split_text_to_sentences(paragraph)
- for single_sentence in sentences:
- f = pool_executor.submit(self.query_word_meaning_from_gpt, single_sentence, task_id)
- futures.append(f)
- wait(futures)
- all_sentence_word_meaningid_dict = {}
- for f in futures:
- f_result = f.result()
- all_sentence_word_meaningid_dict.update(f_result)
- return all_sentence_word_meaningid_dict
- def query_word_meaning_from_gpt(self, single_sentence, task_id) -> dict:
- """single_sentence 提交单个句子"""
- split_words = split_text_to_word(single_sentence)
- split_words = [word_to_prototype(w) for w in split_words if w]
- placeholders = ', '.join(['%s'] * len(split_words))
- sql = f"SELECT WordSpelling, Id, WordMeaning FROM dictionary_meaningitem WHERE WordSpelling IN ({placeholders})"
- r = self.m.query_data(sql, split_words)
- list_of_tuples = list(r)
- sorted_list_of_tuples = sorted(list_of_tuples, key=lambda x: split_words.index(x[0]))
- insert_question_data = OrderedDict()
- for spelling, meaning_id, word_meaning in sorted_list_of_tuples:
- if spelling not in insert_question_data:
- insert_question_data[spelling] = [(meaning_id, word_meaning)]
- else:
- insert_question_data[spelling].append((meaning_id, word_meaning))
- insert_question_data_list = [f"{spelling} 词义组:{data}" for spelling, data in insert_question_data.items()]
- insert_question_data_str = "\n".join(insert_question_data_list)
- q = f"""我会给你一个[英语句子]和[数据组],[数据组]由句子中的每个[固定单词]和[词义组]两部分组成,[词义组]又由多个(词义id,词义)组成。
- 我需要你帮我根据[英语句子]的语境,挑选这个[固定单词]的词义,在对应的在词义组内词义最贴近的id。按示例回复。
- 要求:
- 1.不用考虑词性,只要和英语句子中的词义相近就行。一个固定单词只对应一个词义id。
- 2.如果提供的[词义组]内没有句子对应的词义,返回id为0,例如:[固定单词] ** 0
- 3.回复的每行由固定单词,id两个部分组成,每个部分中间用**分隔。
- 4.所有固定单词都要回复,不要漏下。
- 英语句子:{single_sentence}.
- 数据组:\n{insert_question_data_str}
- 回复示例:
- beauty ** 302816
- apple ** 234567
- """
- real_ip = self.real_ip_dict[task_id]
- demo_name = self.demo_name[task_id]
- r_gpt = get_answer_from_gpt(q, real_ip=real_ip, demo_name=demo_name)
- already_data, need_twice_data = {}, []
- three_list = self._clean_gpt_res(single_sentence, r_gpt, split_words)
- for sentence, spelling, meaning_id in three_list:
- if meaning_id == 0:
- need_twice_data.append([sentence, spelling, meaning_id])
- else:
- already_data[sentence + spelling] = [meaning_id, 1]
- for _, spelling, _ in need_twice_data:
- need_twice_words = ",".join([spelling])
- q2 = f"""我会给你一个英语句子,和句子中的几个单词。请给我这几个单词在句子中的中文词义。按示例回复json数据。
- 英语句子:{single_sentence}
- 单词:{need_twice_words}
-
- 要求:
- 1.给到的单词都要回复其中文词义。
- 2.回复的json,以单词为键,它的中文词义为键。
-
- 回复示例:
- {{"单词":"中文词义",...}}
- """
- r2 = get_answer_from_gpt(q2, real_ip=real_ip, demo_name=demo_name, json_resp=True)
- r2_json: dict = json.loads(r2)
- for w_spelling, chinese_meaning in r2_json.items():
- already_data[single_sentence + w_spelling] = [chinese_meaning, 2]
- return already_data
- def upload_json_file_to_oss(self, article_id: int, data_dict: dict):
- json_data = json.dumps(data_dict, ensure_ascii=False)
- object_name = f'study/article/{article_id}'
- content = json_data.encode('utf-8')
- for _ in range(2):
- try:
- r = self.bucket.put_object(object_name, content)
- except Exception as e:
- logger.error(f"上传文件错误{type(e).__name__} {e},taskid:{article_id}")
- continue
- s = r.resp.status
- if s == 200:
- logger.success(f"上传oss成功 {article_id}")
- return True
- else:
- logger.critical(f"2次上传oss错误,taskid:{article_id}")
- def notice_teach_system(self, article_id: int, class_id: int, word_count: int, article_sentence_count: int):
- url = self.callback_url_dict.get(article_id)
- if not url or "localhost/callback" in url:
- return False
- json_data = {"classId": class_id, "articleId": article_id, "articleWordCount": word_count,
- "articleSentenceCount": article_sentence_count}
- for _ in range(3):
- try:
- r = requests.post(url, json=json_data)
- r.raise_for_status()
- self.callback_url_dict.pop(article_id, '')
- logger.success(f"通知成功{r.text}")
- return True
- except Exception as e:
- logger.warning(f"{type(e).__name__} {e}")
- logger.critical(f"通知接口失败,三次全错. article_id:{article_id} callback_url:{url}")
- def clean_source(self, article_id):
- self.callback_url_dict.pop(article_id, '')
- self.real_ip_dict.pop(article_id, '')
- def run_task(self, words_meaning_str, task_id, student_stage, vocabulary, class_id):
- try:
- title, r_article_sentences, r_article_chinese_dict = self.run_get_article_task(words_meaning_str, task_id, student_stage,
- vocabulary)
- outside_json_dict, word_count, article_sentence_count = self.split_article_make_json(task_id, title, r_article_sentences,
- r_article_chinese_dict)
- self.upload_json_file_to_oss(article_id=task_id, data_dict=outside_json_dict)
- self.notice_teach_system(article_id=task_id, class_id=class_id, word_count=word_count,
- article_sentence_count=article_sentence_count)
- self.clean_source(article_id=task_id)
- logger.success(f"文章任务完成。taskid:{task_id}")
- except Exception as e:
- logger.error(f"{type(e).__name__} {e}")
- traceback_str = traceback.format_exc()
- logger.error(f"外围错误追溯:{traceback_str}")
|