# -*- coding: utf-8 -*- import os import time import requests import threading from datetime import datetime import json from spoken_language.common import credential from spoken_language.soe import speaking_assessment from spoken_language.read_config import read_config config_data = read_config() app_id, secret_id, secret_key = config_data['appId'], config_data['SecretId'], config_data['SecretKey'] APPID = app_id SECRET_ID = secret_id SECRET_KEY = secret_key TOKEN = "" ENGINE_MODEL_TYPE = "16k_en" SLICE_SIZE = 32000 spoken_result = {} class MySpeechRecognitionListener(speaking_assessment.SpeakingAssessmentListener): def __init__(self, id): self.id = id def on_recognition_start(self, response): pass def on_intermediate_result(self, response): rsp_str = json.dumps(response, ensure_ascii=False) def on_recognition_complete(self, response): global spoken_result spoken_result[self.id] = response def on_fail(self, response): rsp_str = json.dumps(response, ensure_ascii=False) def process(id): audio = r"C:\Users\86131\Desktop\音频\output_16k_mono.mp3" listener = MySpeechRecognitionListener(id) credential_var = credential.Credential(SECRET_ID, SECRET_KEY) recognizer = speaking_assessment.SpeakingAssessment( APPID, credential_var, ENGINE_MODEL_TYPE, listener) recognizer.set_text_mode(0) recognizer.set_ref_text("anyway") recognizer.set_eval_mode(0) recognizer.set_keyword("") recognizer.set_sentence_info_enabled(0) recognizer.set_voice_format(1) try: recognizer.start() with open(audio, 'rb') as f: content = f.read(SLICE_SIZE) while content: recognizer.write(content) content = f.read(SLICE_SIZE) # sleep模拟实际实时语音发送间隔 time.sleep(0.2) except Exception as e: print(e) finally: recognizer.stop() def process_rec(task_id, audio_path, audio_text, audio_binary=None): audio = audio_path listener = MySpeechRecognitionListener(task_id) credential_var = credential.Credential(SECRET_ID, SECRET_KEY) recognizer = speaking_assessment.SpeakingAssessment( APPID, credential_var, ENGINE_MODEL_TYPE, listener) recognizer.set_text_mode(0) recognizer.set_ref_text(audio_text) recognizer.set_eval_mode(1) recognizer.set_keyword("") recognizer.set_sentence_info_enabled(0) recognizer.set_voice_format(2) recognizer.set_rec_mode(1) try: recognizer.start() if audio_binary: recognizer.write(audio_binary) else: with open(f"{task_id}.mp3", 'rb') as f: content = f.read() recognizer.write(content) except Exception as e: print(e) finally: recognizer.stop() def process_multithread(number): thread_list = [] for i in range(0, number): thread = threading.Thread(target=process, args=(i,)) thread_list.append(thread) thread.start() for thread in thread_list: thread.join() def make_spoken(task_id, audio_url, audio_content, audio_text): if audio_url: print("有url,应该去下载mp3文件") r = requests.get(audio_url) audio_content = r.content else: with open(f"{task_id}.mp3", 'wb') as f: f.write(audio_content) process_rec(task_id, audio_path=f"", audio_text=audio_text, audio_binary=audio_content) global spoken_result for _ in range(60): if task_id in spoken_result: r = spoken_result[task_id] del spoken_result[task_id] if os.path.exists(f"{task_id}.mp3"): os.remove(f"{task_id}.mp3") return r time.sleep(0.5) return None if __name__ == "__main__": process_rec(0, r"C:\Users\86131\Desktop\音频\output_16k_mono.mp3", "You must study to be frank with the world apple")