此最佳实践旨在介绍如何使用平台内目前提供的三个语音大模型接口(语音转文字文生文文字转语音接口 )实现类似智能音箱的功能。具体实现逻辑如下:代码运行后,会提示输入一段语音,程序采集到语音后调用语音转文字接口,把识别文字结果并打印到控制台。然后,调用大语言模型生成会话回复(支持多轮会话),把回复内容打印到控制台,同时调用文字转语音接口,把合成出来的语音朗读出来。程序将再进入下一次循环。

参考以下操作,AI 应用开发者可以很方便的通过 OpenAI 标准依赖库使用大模型服务,实现“多模态”应用。

前提条件

已创建 API 密钥

操作步骤

  1. 将以下示例代码中 api_key 所对应的值,替换为实际的 API 密钥,并将代码内容保存为 audio_robot.py

    # -*- coding: utf-8 -*-
    import os
    import io
    import threading
    import time
    import wave
    import queue
    import random
    
    import pygame
    import pyaudio
    from openai import OpenAI
    
    
    client = OpenAI(api_key='sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', base_url='https://openapi.coreshub.cn/v1') #api_key 值需替换成用户创建的实际 API 密钥。
    
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    CHUNK = 1024
    
    def clear_and_print(content):
        content = content.replace('\n', '')
        print(f'\r{content}', end='', flush=True)
    
    def truncate_to_last_sentence(text):
        last_punct = max(text.rfind('!'), text.rfind('。'), text.rfind('?'))
        if last_punct != -1:
            return text[:last_punct + 1]
        return text
    
    class AudioRecorder:
        """
        录音机
        """
        def __init__(self):
            self.audio = pyaudio.PyAudio()
            self.stream = None
            self.frames = []
            self.is_recording = False
    
        def start_recording(self):
            self.is_recording = True
            self.frames = []
            print("[系统] 输入任意键开始录音,退出请按'q'")
            if input() == 'q':
                print('[系统] 再见!')
                exit(0)
            self.stream = self.audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True,
                                          frames_per_buffer=CHUNK, stream_callback=self.callback)
            self.stream.start_stream()
            print("[系统] 开始录音,任意键结束录音...")
    
        def stop_recording(self):
            if self.stream:
                self.stream.stop_stream()
                self.stream.close()
            self.stream = None
            self.is_recording = False
            print("[系统] 录音结束.")
            return self.save_audio()
    
        def save_audio(self):
            filename = f"recording_{int(time.time())}.wav"
            wf = wave.open(filename, 'wb')
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(self.audio.get_sample_size(FORMAT))
            wf.setframerate(RATE)
            wf.writeframes(b''.join(self.frames))
            wf.close()
            return filename
    
        def callback(self, in_data, frame_count, time_info, status):
            if self.is_recording:
                self.frames.append(in_data)
            return (in_data, pyaudio.paContinue)
    
        def listen(self):
            self.start_recording()
            try:
                input()
            except KeyboardInterrupt:
                pass
            finally:
                return self.stop_recording()
    
        def __del__(self):
            if self.stream:
                self.stream.stop_stream()
                self.stream.close()
            self.audio.terminate()
    
    
    class AudioPlayer:
        def __init__(self):
            self.text_queue = queue.Queue()
            self.audio_data_queue = queue.Queue()
            self.is_playing = False
            pygame.mixer.init()
            threading.Thread(target=self._request_audio_thread, daemon=True).start()
            threading.Thread(target=self._play_audio_thread, daemon=True).start()
    
        def add_to_queue(self, text):
            self.text_queue.put(text)
    
        def _request_audio_thread(self):
            while True:
                text = self.text_queue.get()
                response = client.audio.speech.create(model='CosyVoice-300M', voice='中文女', input=text)
                audio_data = io.BytesIO(response.content)
                self.audio_data_queue.put(audio_data)
                self.text_queue.task_done()
    
        def _play_audio_thread(self):
            while True:
                audio_data = self.audio_data_queue.get()
                self._play_audio(audio_data)
                time.sleep(0.8 + 0.1 * abs(random.random()))
                self.audio_data_queue.task_done()
    
        def _play_audio(self, audio_data):
            self.is_playing = True
            pygame.mixer.music.load(audio_data)
            pygame.mixer.music.play()
            while pygame.mixer.music.get_busy():
                pygame.time.Clock().tick(10)
            self.is_playing = False
    
    def stream_chat_response(messages):
        response = client.chat.completions.create(model="Qwen2-0.5B-Instruct", messages=messages, stream=True)
        full_text = ""
        for chunk in response:
            if chunk.choices[0].delta.content:
                new_content = chunk.choices[0].delta.content
                full_text += new_content
                yield new_content, full_text
    
    def clean_text(text):
        text = text.replace("\n", "")
        text = text.replace("*", "")
        return text
    
    recorder = AudioRecorder()
    audio_player = AudioPlayer()
    history = []
    print('[系统] 开始对话')
    while True:
        audio_file = recorder.listen()
    
        # ------------------------------------------------------------------------------------------------
        print('ASR 推理中...')
        with open(audio_file, 'rb') as file:
            response = client.audio.transcriptions.create(file=file, model='SenseVoiceSmall')
        os.remove(audio_file)
        question_txt = response.text
        print("> 问题: ", question_txt)
    
        # ------------------------------------------------------------------------------------------------
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            *[{"role": "user" if i % 2 == 0 else "assistant", "content": msg} for i, msg in enumerate(sum(history, ()))],
            {"role": "user", "content": question_txt}
        ]
        full_text = ""
        audio_chunk = ""
        for new_content, full_text in stream_chat_response(messages):
            clear_and_print(f'< 回答: {full_text}')
            audio_chunk += new_content
    
            if ('!' in audio_chunk or '?' in audio_chunk or '。' in audio_chunk) and len(audio_chunk) > 55:
                truncated_chunk = truncate_to_last_sentence(audio_chunk)
                if truncated_chunk:
                    cleaned_chunk = clean_text(truncated_chunk)
                    audio_player.add_to_queue(cleaned_chunk)
                    audio_chunk = audio_chunk[len(truncated_chunk):]
        print()
        if audio_chunk:
            truncated_chunk = truncate_to_last_sentence(audio_chunk)
            if truncated_chunk:
                audio_player.add_to_queue(truncated_chunk)
            if len(audio_chunk) > len(truncated_chunk):
                audio_player.add_to_queue(audio_chunk[len(truncated_chunk):])
    
        history.append((question_txt, full_text))
        history = history[-8:]
    
        audio_player.text_queue.join()
        audio_player.audio_data_queue.join()
  2. 打开本地终端,执行如下命令,安装相应依赖。

    pip install pygame pyaudio openai
    说明

    若运行失败,可将命令行中的 pip 改为 pip3

  3. 进入 audio_robot.py 文件所在目录,执行如下命令,运行程序。

    说明

    若运行失败,可将命令行中的 python 改为 python3

    python audio_robot.py

    回显示例:

    bp plug model app 1