123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- # -*- coding: utf-8 -*-
- import os
- import time
- import requests
- import threading
- from datetime import datetime
- import json
- from spoken_language.common import credential
- from spoken_language.soe import speaking_assessment
- from spoken_language.read_config import read_config
- config_data = read_config()
- app_id, secret_id, secret_key = config_data['appId'], config_data['SecretId'], config_data['SecretKey']
- APPID = app_id
- SECRET_ID = secret_id
- SECRET_KEY = secret_key
- TOKEN = ""
- ENGINE_MODEL_TYPE = "16k_en"
- SLICE_SIZE = 32000
- spoken_result = {}
- class MySpeechRecognitionListener(speaking_assessment.SpeakingAssessmentListener):
- def __init__(self, id):
- self.id = id
- def on_recognition_start(self, response):
- pass
- def on_intermediate_result(self, response):
- rsp_str = json.dumps(response, ensure_ascii=False)
- def on_recognition_complete(self, response):
- global spoken_result
- spoken_result[self.id] = response
- def on_fail(self, response):
- rsp_str = json.dumps(response, ensure_ascii=False)
- def process(id):
- audio = r"C:\Users\86131\Desktop\音频\output_16k_mono.mp3"
- listener = MySpeechRecognitionListener(id)
- credential_var = credential.Credential(SECRET_ID, SECRET_KEY)
- recognizer = speaking_assessment.SpeakingAssessment(
- APPID, credential_var, ENGINE_MODEL_TYPE, listener)
- recognizer.set_text_mode(0)
- recognizer.set_ref_text("anyway")
- recognizer.set_eval_mode(0)
- recognizer.set_keyword("")
- recognizer.set_sentence_info_enabled(0)
- recognizer.set_voice_format(1)
- try:
- recognizer.start()
- with open(audio, 'rb') as f:
- content = f.read(SLICE_SIZE)
- while content:
- recognizer.write(content)
- content = f.read(SLICE_SIZE)
- # sleep模拟实际实时语音发送间隔
- time.sleep(0.2)
- except Exception as e:
- print(e)
- finally:
- recognizer.stop()
- def process_rec(task_id, audio_path, audio_text, audio_binary=None):
- audio = audio_path
- listener = MySpeechRecognitionListener(task_id)
- credential_var = credential.Credential(SECRET_ID, SECRET_KEY)
- recognizer = speaking_assessment.SpeakingAssessment(
- APPID, credential_var, ENGINE_MODEL_TYPE, listener)
- recognizer.set_text_mode(0)
- recognizer.set_ref_text(audio_text)
- recognizer.set_eval_mode(1)
- recognizer.set_keyword("")
- recognizer.set_sentence_info_enabled(0)
- recognizer.set_voice_format(2)
- recognizer.set_rec_mode(1)
- try:
- recognizer.start()
- if audio_binary:
- recognizer.write(audio_binary)
- else:
- with open(f"{task_id}.mp3", 'rb') as f:
- content = f.read()
- recognizer.write(content)
- except Exception as e:
- print(e)
- finally:
- recognizer.stop()
- def process_multithread(number):
- thread_list = []
- for i in range(0, number):
- thread = threading.Thread(target=process, args=(i,))
- thread_list.append(thread)
- thread.start()
- for thread in thread_list:
- thread.join()
- def make_spoken(task_id, audio_url, audio_content, audio_text):
- if audio_url:
- print("有url,应该去下载mp3文件")
- r = requests.get(audio_url)
- audio_content = r.content
- else:
- with open(f"{task_id}.mp3", 'wb') as f:
- f.write(audio_content)
- process_rec(task_id, audio_path=f"", audio_text=audio_text, audio_binary=audio_content)
- global spoken_result
- for _ in range(60):
- if task_id in spoken_result:
- r = spoken_result[task_id]
- del spoken_result[task_id]
- if os.path.exists(f"{task_id}.mp3"):
- os.remove(f"{task_id}.mp3")
- return r
- time.sleep(0.5)
- return None
- if __name__ == "__main__":
- process_rec(0, r"C:\Users\86131\Desktop\音频\output_16k_mono.mp3", "You must study to be frank with the world apple")
|