1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- # -*- coding:utf-8 -*-
- if __name__ == '__main__':
- import os
- os.chdir("..")
- import requests
- import random
- import time
- from tools.loglog import logger, simple_logger
- from tools.new_mysql import MySQLUploader
- m = MySQLUploader()
- def insert_ip_token(ip, demo_name, gpt_content, prompt_tokens, completion_tokens, total_tokens):
- sql = "insert into consumer_token (ip,demo_name,gpt_content,prompt_tokens,completion_tokens,total_tokens) values (%s,%s,%s,%s,%s,%s)"
- m.execute_(sql, (ip, demo_name, str(gpt_content), prompt_tokens, completion_tokens, total_tokens))
- def get_answer_from_gpt(question, real_ip="localhost", demo_name="无", model="gpt-4o", max_tokens=3500, temperature: float = 0,
- json_resp=False, n=1, sys_prompt=None):
- if "3.5" in model or "3.5-turbo" in model or "3.5turbo" in model:
- model = "gpt-3.5-turbo"
- elif "4o" in model or "gpt4o" in model:
- model = "gpt-4o"
- elif "4turbo" in model or "4-turbo" in model:
- model = "gpt-4-turbo"
- d2 = {
- "model": model,
- "messages": [],
- "max_tokens": max_tokens,
- "temperature": temperature,
- 'n': n}
- if sys_prompt:
- d2['messages'].append({"role": "system", "content": sys_prompt})
- d2['messages'].append({"role": "user", "content": question})
- if json_resp is True:
- d2["response_format"] = {"type": "json_object"}
- elif json_resp is False:
- pass
- else:
- d2["response_format"] = json_resp
- for _ in range(3):
- try:
- response = requests.post(f'http://170.106.108.95/v1/chat/completions', json=d2)
- r_json = response.json()
- if r2 := r_json.get("choices", None):
- if n > 1:
- gpt_res = []
- for i in r2:
- gpt_res.append(i["message"]["content"])
- else:
- gpt_res = r2[0]["message"]["content"]
- gpt_content = str(gpt_res)
- prompt_tokens = r_json["usage"]["prompt_tokens"]
- completion_tokens = r_json["usage"]["completion_tokens"]
- total_tokens = r_json["usage"]["total_tokens"]
- insert_ip_token(real_ip, demo_name, gpt_content, prompt_tokens, completion_tokens, total_tokens)
- simple_logger.info(f"问题日志:\n{question}\n回答日志:\n{gpt_res}")
- return gpt_res
- elif r_json.get("message") == "IP address blocked":
- print("IP address blocked")
- raise Exception("IP address blocked")
- else:
- print(f"小错误:{question[:10]}")
- logger.error(response.text)
- except Exception as e:
- logger.info(f"小报错忽略{e}")
- time.sleep(10)
- logger.critical("get_answer_from_gpt 严重错误,3次后都失败了")
- def parse_gpt_phon_to_tuplelist(text: str) -> list:
- """解析gpt返回的音标数据"""
- result = []
- if not text:
- return []
- for i in text.split("\n"):
- ii = i.split("***")
- if len(ii) >= 3:
- result.append((ii[0].strip(), ii[1].strip(), ii[2].strip()))
- return result
- if __name__ == '__main__':
- pass
- resp = get_answer_from_gpt("hello", temperature=0.8, model='gpt-4o')
- print(resp)
|