# -*- coding:utf-8 -*- import json import time from typing import Dict, Any, Union import requests from pydantic import ValidationError from gpt.gpt_check import Article, Annotation from tools.loglog import logger, simple_logger, log_err_e, temp_logger from tools.new_mysql import MySQLUploader m = MySQLUploader() def get_openai_model(model_text: str): """模糊获得模型名""" if "3.5" in model_text or "3.5-turbo" in model_text or "3.5turbo" in model_text: model = "gpt-3.5-turbo" elif "4o" in model_text or "gpt4o" in model_text: model = "gpt-4o" elif "4turbo" in model_text or "4-turbo" in model_text: model = "gpt-4-turbo" else: model = "gpt-4o" return model def insert_ip_token(ip, demo_name, gpt_content, prompt_tokens, completion_tokens, total_tokens): sql = "insert into consumer_token (ip,demo_name,gpt_content,prompt_tokens,completion_tokens,total_tokens) values (%s,%s,%s,%s,%s,%s)" m.execute_(sql, (ip, demo_name, str(gpt_content), prompt_tokens, completion_tokens, total_tokens)) def get_answer_from_gpt(question, real_ip="localhost", demo_name="无", model="gpt-4o", max_tokens=3500, temperature: float = 0, json_resp: Union[Dict[Any, Any], bool] = False, n=1, check_fucn=None, sys_prompt=None): model = get_openai_model(model) d2 = {"model": model, "messages": [], "max_tokens": max_tokens, "temperature": temperature, 'n': n} if sys_prompt: d2['messages'].append({"role": "system", "content": sys_prompt}) d2['messages'].append({"role": "user", "content": question}) if json_resp is True: d2["response_format"] = {"type": "json_object"} elif json_resp is False: pass else: d2["response_format"] = json_resp for num_count in range(3): try: response = requests.post(f'http://170.106.108.95/v1/chat/completions', json=d2) r_json = response.json() if r2 := r_json.get("choices", None): if n > 1: gpt_res = [] for i in r2: gpt_res.append(i["message"]["content"]) else: gpt_res = r2[0]["message"]["content"] gpt_content = str(gpt_res) prompt_tokens = r_json["usage"]["prompt_tokens"] completion_tokens = r_json["usage"]["completion_tokens"] total_tokens = r_json["usage"]["total_tokens"] insert_ip_token(real_ip, demo_name, gpt_content, prompt_tokens, completion_tokens, total_tokens) simple_logger.info(f"问题日志:\n{question}\n回答日志:\n{gpt_res}") if not check_fucn: return gpt_res check_result = check_fucn(str(gpt_res)) if check_result: return gpt_res else: raise Exception(f"第{num_count + 1}次共3次,GPT的校验没有通过,校验函数:{check_fucn.__name__}") elif r_json.get("message") == "IP address blocked": print("IP address blocked") raise Exception("IP address blocked") else: print(f"小错误:{question[:10]}") logger.error(response.text) except Exception as e: logger.info(f"小报错忽略{e}") time.sleep(10) logger.critical("get_answer_from_gpt 严重错误,3次后都失败了") def get_article_gpt_pydantic(question, real_ip="localhost", demo_name="无", model="gpt-4.1", max_tokens=3500, temperature: float = 0, n=1, check_fucn=None, sys_prompt=None, task_id=0, exercise_id=0): """ 异步获取文章 :param question: 问题 :param real_ip: 真实IP :param demo_name: 项目名称 :param model: 模型名称 :param max_tokens: 最大token数 :param temperature: 温度 :param n: 生成数量 :param check_fucn: 校验函数 :param sys_prompt: 系统提示 :param task_id: 任务id :param exercise_id: 学案id :return: 文章内容 """ d2 = {"model": model, "messages": [], "max_tokens": max_tokens, "temperature": temperature, "n": n, "response_format": {'type': 'json_schema', 'json_schema': {'name': 'Article', 'schema': {'$defs': {'Candidate': { 'properties': {'label': {'allOf': [{'$ref': '#/$defs/Options'}], 'description': 'ABCD序号的一种', 'title': '序号'}, 'text': {'description': '英文,ABCD选项的文本', 'title': '选项文本', 'type': 'string'}, 'isRight': {'allOf': [{'$ref': '#/$defs/IsRight'}], 'description': '1是正确,0是错误', 'title': '是否是正确答案'}}, 'required': ['label', 'text', 'isRight'], 'title': 'Candidate', 'type': 'object'}, 'DifficultSentence': { 'properties': {'english': {'description': '文章中的一句难句', 'title': '英语难句', 'type': 'string'}, 'chinese': {'description': '对英语难句的翻译', 'title': '中文难句', 'type': 'string'}}, 'required': ['english', 'chinese'], 'title': 'DifficultSentence', 'type': 'object'}, 'IsRight': {'enum': [1, 0], 'title': 'IsRight', 'type': 'integer'}, 'Options': {'enum': ['A', 'B', 'C', 'D'], 'title': 'Options', 'type': 'string'}, 'Question': {'properties': {'trunk': { 'description': '用英语给出的选择题题目', 'title': '选择题题目', 'type': 'string'}, 'analysis': { 'description': '中文,选择题的分析思路;不要给出答案的ABCD序号', 'title': '选择题分析', 'type': 'string'}, 'candidates': { 'description': '一共4个选择题', 'items': { '$ref': '#/$defs/Candidate'}, 'title': '选项对象', 'type': 'array'}}, 'required': ['trunk', 'analysis', 'candidates'], 'title': 'Question', 'type': 'object'}}, 'properties': {'difficultSentences': { 'description': '挑选一句难句对象', 'items': {'$ref': '#/$defs/DifficultSentence'}, 'title': '难句对象', 'type': 'array'}, 'usedMeanIds': { 'items': {'type': 'integer'}, 'title': '用到的词义id', 'type': 'array'}, 'questions': { 'description': '针对英语文章的选择题', 'items': {'$ref': '#/$defs/Question'}, 'title': '问题对象', 'type': 'array'}, 'englishArticle': { 'description': '', 'title': '英语文章', 'type': 'string'}, 'chineseArticle': { 'description': '', 'title': '中文翻译', 'type': 'string'}}, 'required': ['difficultSentences', 'usedMeanIds', 'questions', 'englishArticle', 'chineseArticle'], 'title': 'Article', 'type': 'object'}}} } if sys_prompt: d2['messages'].append({"role": "system", "content": sys_prompt}) d2['messages'].append({"role": "user", "content": question}) for num_count in range(3): try: response = requests.post('http://170.106.108.95/v1/chat/completions', json=d2) r_json = response.json() for choice in r_json["choices"]: Article.model_validate_json(choice["message"]["content"]) simple_logger.info(f"问题日志task_id:{task_id},exercise_id:{exercise_id}\n回答日志:\n{r_json}") return r_json except ValidationError as e: logger.error(f"gpt回复校验失败task_id:{task_id},exercise_id:{exercise_id}:") except requests.exceptions.RequestException as e: logger.error(f"HTTP请求错误task_id:{task_id},exercise_id:{exercise_id}: {str(e)}") time.sleep(1) except json.decoder.JSONDecodeError as e: if 'response' in locals() and response is not None: logger.error(f"json格式化错误task_id:{task_id},exercise_id:{exercise_id}:{response.text}") except Exception as e: log_err_e(e, f"其他错误task_id:{task_id},exercise_id:{exercise_id}") def get_annotation_gpt_pydantic(question, real_ip="localhost", demo_name="无", model="gpt-4.1", max_tokens=3500, temperature: float = 0, n=1, check_fucn=None, sys_prompt=None, task_id=0, exercise_id=0): """ 异步获取文章 :param question: 问题 :param real_ip: 真实IP :param demo_name: 项目名称 :param model: 模型名称 :param max_tokens: 最大token数 :param temperature: 温度 :param n: 生成数量 :param check_fucn: 校验函数 :param sys_prompt: 系统提示 :param task_id: 任务id :param exercise_id: 学案id :return: 标注内容 """ d2 = {"model": model, "messages": [], "max_tokens": max_tokens, "temperature": temperature, "n": n, "response_format": {'type': 'json_schema', 'json_schema': {'name': 'Annotation', 'schema': {'properties': { 'annotation_text': {'description': '对句子或文章的每个单词进行词义id的标注', 'examples': ['an[33] apple[123]'], 'title': '标注文本', 'type': 'string'}}, 'required': ['annotation_text'], 'title': 'Annotation', 'type': 'object'}}} } if sys_prompt: d2['messages'].append({"role": "system", "content": sys_prompt}) d2['messages'].append({"role": "user", "content": question}) for num_count in range(3): try: response = requests.post('http://170.106.108.95/v1/chat/completions', json=d2) r_json = response.json() for choice in r_json["choices"]: Annotation.model_validate_json(choice["message"]["content"]) temp_logger.info(f"日志task_id:{task_id},exercise_id:{exercise_id}:\n问题日志:\n{question}") simple_logger.info(f"日志task_id:{task_id},exercise_id:{exercise_id}:\n回答日志:\n{r_json}") return r_json except ValidationError as e: logger.error(f"gpt回复校验失败task_id:{task_id},exercise_id:{exercise_id}:") except requests.exceptions.RequestException as e: logger.error(f"HTTP请求错误task_id:{task_id},exercise_id:{exercise_id}: {str(e)}") time.sleep(1) except json.decoder.JSONDecodeError as e: if 'response' in locals() and response is not None: logger.error(f"json格式化错误task_id:{task_id},exercise_id:{exercise_id}:{response.text}") except Exception as e: log_err_e(e, f"其他错误task_id:{task_id},exercise_id:{exercise_id}") def parse_gpt_phon_to_tuplelist(text: str) -> list: """解析gpt返回的音标数据""" result = [] if not text: return [] for i in text.split("\n"): ii = i.split("***") if len(ii) >= 3: result.append((ii[0].strip(), ii[1].strip(), ii[2].strip())) return result