如何将平台内的语音大模型接入自己的应用
更新时间:2024-11-25 15:56:39
此最佳实践旨在介绍如何使用平台内目前提供的三个语音大模型接口(语音转文字、文生文和文字转语音接口 )实现类似智能音箱的功能。具体实现逻辑如下:代码运行后,会提示输入一段语音,程序采集到语音后调用语音转文字接口,把识别文字结果并打印到控制台。然后,调用大语言模型生成会话回复(支持多轮会话),把回复内容打印到控制台,同时调用文字转语音接口,把合成出来的语音朗读出来。程序将再进入下一次循环。
参考以下操作,AI 应用开发者可以很方便的通过 OpenAI 标准依赖库使用大模型服务,实现“多模态”应用。
前提条件
已创建 API 密钥。
操作步骤
-
将以下示例代码中
api_key
所对应的值,替换为实际的 API 密钥,并将代码内容保存为audio_robot.py
。# -*- coding: utf-8 -*- import os import io import threading import time import wave import queue import random import pygame import pyaudio from openai import OpenAI client = OpenAI(api_key='sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', base_url='https://openapi.coreshub.cn/v1') #api_key 值需替换成用户创建的实际 API 密钥。 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 CHUNK = 1024 def clear_and_print(content): content = content.replace('\n', '') print(f'\r{content}', end='', flush=True) def truncate_to_last_sentence(text): last_punct = max(text.rfind('!'), text.rfind('。'), text.rfind('?')) if last_punct != -1: return text[:last_punct + 1] return text class AudioRecorder: """ 录音机 """ def __init__(self): self.audio = pyaudio.PyAudio() self.stream = None self.frames = [] self.is_recording = False def start_recording(self): self.is_recording = True self.frames = [] print("[系统] 输入任意键开始录音,退出请按'q'") if input() == 'q': print('[系统] 再见!') exit(0) self.stream = self.audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=self.callback) self.stream.start_stream() print("[系统] 开始录音,任意键结束录音...") def stop_recording(self): if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None self.is_recording = False print("[系统] 录音结束.") return self.save_audio() def save_audio(self): filename = f"recording_{int(time.time())}.wav" wf = wave.open(filename, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(self.frames)) wf.close() return filename def callback(self, in_data, frame_count, time_info, status): if self.is_recording: self.frames.append(in_data) return (in_data, pyaudio.paContinue) def listen(self): self.start_recording() try: input() except KeyboardInterrupt: pass finally: return self.stop_recording() def __del__(self): if self.stream: self.stream.stop_stream() self.stream.close() self.audio.terminate() class AudioPlayer: def __init__(self): self.text_queue = queue.Queue() self.audio_data_queue = queue.Queue() self.is_playing = False pygame.mixer.init() threading.Thread(target=self._request_audio_thread, daemon=True).start() threading.Thread(target=self._play_audio_thread, daemon=True).start() def add_to_queue(self, text): self.text_queue.put(text) def _request_audio_thread(self): while True: text = self.text_queue.get() response = client.audio.speech.create(model='CosyVoice-300M', voice='中文女', input=text) audio_data = io.BytesIO(response.content) self.audio_data_queue.put(audio_data) self.text_queue.task_done() def _play_audio_thread(self): while True: audio_data = self.audio_data_queue.get() self._play_audio(audio_data) time.sleep(0.8 + 0.1 * abs(random.random())) self.audio_data_queue.task_done() def _play_audio(self, audio_data): self.is_playing = True pygame.mixer.music.load(audio_data) pygame.mixer.music.play() while pygame.mixer.music.get_busy(): pygame.time.Clock().tick(10) self.is_playing = False def stream_chat_response(messages): response = client.chat.completions.create(model="Qwen2-0.5B-Instruct", messages=messages, stream=True) full_text = "" for chunk in response: if chunk.choices[0].delta.content: new_content = chunk.choices[0].delta.content full_text += new_content yield new_content, full_text def clean_text(text): text = text.replace("\n", "") text = text.replace("*", "") return text recorder = AudioRecorder() audio_player = AudioPlayer() history = [] print('[系统] 开始对话') while True: audio_file = recorder.listen() # ------------------------------------------------------------------------------------------------ print('ASR 推理中...') with open(audio_file, 'rb') as file: response = client.audio.transcriptions.create(file=file, model='SenseVoiceSmall') os.remove(audio_file) question_txt = response.text print("> 问题: ", question_txt) # ------------------------------------------------------------------------------------------------ messages = [ {"role": "system", "content": "You are a helpful assistant."}, *[{"role": "user" if i % 2 == 0 else "assistant", "content": msg} for i, msg in enumerate(sum(history, ()))], {"role": "user", "content": question_txt} ] full_text = "" audio_chunk = "" for new_content, full_text in stream_chat_response(messages): clear_and_print(f'< 回答: {full_text}') audio_chunk += new_content if ('!' in audio_chunk or '?' in audio_chunk or '。' in audio_chunk) and len(audio_chunk) > 55: truncated_chunk = truncate_to_last_sentence(audio_chunk) if truncated_chunk: cleaned_chunk = clean_text(truncated_chunk) audio_player.add_to_queue(cleaned_chunk) audio_chunk = audio_chunk[len(truncated_chunk):] print() if audio_chunk: truncated_chunk = truncate_to_last_sentence(audio_chunk) if truncated_chunk: audio_player.add_to_queue(truncated_chunk) if len(audio_chunk) > len(truncated_chunk): audio_player.add_to_queue(audio_chunk[len(truncated_chunk):]) history.append((question_txt, full_text)) history = history[-8:] audio_player.text_queue.join() audio_player.audio_data_queue.join()
-
打开本地终端,执行如下命令,安装相应依赖。
pip install pygame pyaudio openai
说明 若运行失败,可将命令行中的
pip
改为pip3
。 -
进入
audio_robot.py
文件所在目录,执行如下命令,运行程序。说明 若运行失败,可将命令行中的
python
改为python3
。python audio_robot.py
回显示例: