# -*- coding:utf-8 -*- if __name__ == '__main__': import os os.chdir("..") import requests import random import time from tools.loglog import logger, simple_logger from tools.new_mysql import MySQLUploader m = MySQLUploader() def insert_ip_token(ip, demo_name, gpt_content, prompt_tokens, completion_tokens, total_tokens): sql = "insert into consumer_token (ip,demo_name,gpt_content,prompt_tokens,completion_tokens,total_tokens) values (%s,%s,%s,%s,%s,%s)" m.execute_(sql, (ip, demo_name, str(gpt_content), prompt_tokens, completion_tokens, total_tokens)) def get_answer_from_gpt(question, real_ip="localhost", demo_name="无", model="gpt-4o", max_tokens=3500, temperature: float = 0, json_resp=False, n=1, sys_prompt=None): if "3.5" in model or "3.5-turbo" in model or "3.5turbo" in model: model = "gpt-3.5-turbo" elif "4o" in model or "gpt4o" in model: model = "gpt-4o" elif "4turbo" in model or "4-turbo" in model: model = "gpt-4-turbo" d2 = { "model": model, "messages": [], "max_tokens": max_tokens, "temperature": temperature, 'n': n} if sys_prompt: d2['messages'].append({"role": "system", "content": sys_prompt}) d2['messages'].append({"role": "user", "content": question}) if json_resp is True: d2["response_format"] = {"type": "json_object"} elif json_resp is False: pass else: d2["response_format"] = json_resp for _ in range(3): try: response = requests.post(f'http://170.106.108.95/v1/chat/completions', json=d2) r_json = response.json() if r2 := r_json.get("choices", None): if n > 1: gpt_res = [] for i in r2: gpt_res.append(i["message"]["content"]) else: gpt_res = r2[0]["message"]["content"] gpt_content = str(gpt_res) prompt_tokens = r_json["usage"]["prompt_tokens"] completion_tokens = r_json["usage"]["completion_tokens"] total_tokens = r_json["usage"]["total_tokens"] insert_ip_token(real_ip, demo_name, gpt_content, prompt_tokens, completion_tokens, total_tokens) simple_logger.info(f"问题日志:\n{question}\n回答日志:\n{gpt_res}") return gpt_res elif r_json.get("message") == "IP address blocked": print("IP address blocked") raise Exception("IP address blocked") else: print(f"小错误:{question[:10]}") logger.error(response.text) except Exception as e: logger.info(f"小报错忽略{e}") time.sleep(10) logger.critical("get_answer_from_gpt 严重错误,3次后都失败了") def parse_gpt_phon_to_tuplelist(text: str) -> list: """解析gpt返回的音标数据""" result = [] if not text: return [] for i in text.split("\n"): ii = i.split("***") if len(ii) >= 3: result.append((ii[0].strip(), ii[1].strip(), ii[2].strip())) return result if __name__ == '__main__': pass resp = get_answer_from_gpt("hello", temperature=0.8, model='gpt-4o') print(resp)