Проблема с потоковой передачей звука в Python с микрофона через MQTT в Google Streaming с использованием генераторов

Я прочитал документацию Google и просмотрел их examples, однако мне не удалось заставить это работать правильно в моем конкретном случае использования. Проблема в том, что пакеты аудиопотока разбиваются на более мелкие фрагменты (размер кадра), закодированные в base64 и отправленные через MQTT - это означает, что подход генератора, скорее всего, остановится на полпути, несмотря на то, что он не был полностью завершен отправителем. Компонент My MicrophoneSender отправит последнюю часть сообщения с параметром segment_key = -1, так что это флаг того, что все сообщение было отправлено и что полный / окончательный процесс потока может быть завершен. До этого момента в буфере может не содержаться весь поток, поэтому трудно заставить а) генератор прекратить выдачу б) Google для возврата частичной транскрипции. Частичная транскрипция требуется примерно каждые 10 кадров.

Чтобы лучше проиллюстрировать это, вот мой код.

внутри ресивера:

    STREAMFRAMETHRESHOLD = 10
    def mqttMsgCallback(self, client, userData, msg):
         if msg.topic.startswith("MicSender/stream"):
                msgDict = json.loads(msg.payload)
                streamBytes = b64decode(msgDict['audio_data'].encode('utf-8'))
                frameNum = int(msgDict['segment_num'])

                if frameNum == 0:
                    self.asr_time_start = time.time()
                    self.asr.endOfStream = False

                if frameNum >= 0:
                    self.asr.store_stream_bytes(streamBytes)
                    self.asr.endOfStream = False

                    if frameNum % STREAMFRAMETHRESHOLD == 0:
                        self.asr.get_intermediate_and_print()

                else:
                    #FINAL, recieved -1
                    trans = self.asr.finish_stream()
                    self.send_message(trans)
                    self.frameCount=0

внутри реализации Google Speech Class:

class GoogleASR(ASR):

    def __init__(self, name):
        super().__init__(name)    

        # STREAMING
        self.stream_buf = queue.Queue()
        self.stream_gen = self.getGenerator(self.stream_buf)
        self.endOfStream = True
        self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
        self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
        self.current_transcript = ''
        self.numCharsPrinted = 0

    def getGenerator(self, buff):
        while not self.endOfStream:
            # Use a blocking get() to ensure there's at least one chunk of
            # data, and stop iteration if the chunk is None, indicating the
            # end of the audio stream.
            chunk = buff.get()
            if chunk is None:
                return
            data = [chunk]

            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = buff.get(block=False)
                    data.append(chunk)

                except queue.Empty:
                    self.endOfStream = True
                    yield b''.join(data)
                    break

            yield b''.join(data)


    def store_stream_bytes(self, bytes):
        self.stream_buf.put(bytes)

    def get_intermediate_and_print(self):
        self.get_intermediate()

    def get_intermediate(self):

        if self.stream_buf.qsize() > 1:
            print("stream buf size: {}".format(self.stream_buf.qsize()))
            responses = self.client.streaming_recognize(self.streaming_config, self.requests)
            # print(responses)

            try:
                # Now, put the transcription responses to use.
                if not self.numCharsPrinted:
                    self.numCharsPrinted = 0

                for response in responses:

                    if not response.results:
                        continue

                    # The `results` list is consecutive. For streaming, we only care about
                    # the first result being considered, since once it's `is_final`, it
                    # moves on to considering the next utterance.
                    result = response.results[0]
                    if not result.alternatives:
                        continue

                    # Display the transcription of the top alternative.
                    self.current_transcript = result.alternatives[0].transcript

                    # Display interim results, but with a carriage return at the end of the
                    # line, so subsequent lines will overwrite them.
                    #
                    # If the previous result was longer than this one, we need to print
                    # some extra spaces to overwrite the previous result
                    overwrite_chars = ' ' * (self.numCharsPrinted - len(self.current_transcript))
                    sys.stdout.write(self.current_transcript + overwrite_chars + '\r')
                    sys.stdout.flush()
                    self.numCharsPrinted = len(self.current_transcript)

    def finish_stream(self):
        self.endOfStream = False
        self.get_intermediate()
        self.endOfStream = True

        final_result = self.current_transcript

        self.stream_buf= queue.Queue()
        self.allBytes = bytearray()
        self.current_transcript = ''
        self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
        self.streaming_config = types.StreamingRecognitionConfig(config=self.config)

        return final_result

В настоящее время это ничего не выводит со стороны транскрипции.

stream buf size: 21
stream buf size: 41
stream buf size: 61
stream buf size: 81
stream buf size: 101
stream buf size: 121
stream buf size: 141
stream buf size: 159

Но ответ / расшифровка пустые. Если я поставлю точку останова на ответ for в ответах внутри функции get_intermediate, тогда он никогда не будет запущен, что означает, что по какой-то причине он пуст (не перенастроен из Google). Однако, если я устанавливаю точку останова на генераторе и слишком долго (> 5 секунд) продолжаю выдавать данные, он (Google) сообщает мне, что данные, вероятно, отправляются на сервер слишком медленно. google.api_core.exceptions.OutOfRange: 400 Audio data is being streamed too slow. Please stream audio data approximately at real time.

Может быть, кто-нибудь заметит здесь очевидное ...


person robmsmt    schedule 13.09.2018    source источник


Ответы (1)


Как вы организовали свой код, генератор, который вы передаете Google API, инициализируется ровно один раз - в строке 10 с использованием выражения генератора: self.requests = (...). В собранном виде этот генератор также будет работать ровно один раз и будет «истощен». То же самое относится к функции генератора, которую сам генератор (для ...) вызывает (self.getGeneerator()). Он будет запущен только один раз и остановится, когда получит 10 фрагментов данных (которые, как я вижу, очень маленькие). Затем внешний генератор (который вы назначили self.requests) также остановится навсегда - предоставив ASR только короткий бит данных (10 раз по 20 байтов, глядя на распечатанный вывод отладки). Скорее всего, в этом нет ничего узнаваемого.

Кстати, обратите внимание, что у вас есть избыточный yield b''.join(data) в вашей функции, данные будут отправлены дважды.

Вам нужно будет переделать (внешний) генератор, чтобы он не возвращался, пока не будут получены все данные. Если вы хотите использовать другой генератор, как вы это делаете, чтобы собрать каждый больший кусок для «внешнего» генератора, из которого Google API читает, вам нужно будет переделывать его каждый раз, когда вы начинаете с ним новый цикл.

person Leo K    schedule 13.09.2018