Я прочитал документацию Google и просмотрел их examples, однако мне не удалось заставить это работать правильно в моем конкретном случае использования. Проблема в том, что пакеты аудиопотока разбиваются на более мелкие фрагменты (размер кадра), закодированные в base64 и отправленные через MQTT - это означает, что подход генератора, скорее всего, остановится на полпути, несмотря на то, что он не был полностью завершен отправителем. Компонент My MicrophoneSender отправит последнюю часть сообщения с параметром segment_key = -1, так что это флаг того, что все сообщение было отправлено и что полный / окончательный процесс потока может быть завершен. До этого момента в буфере может не содержаться весь поток, поэтому трудно заставить а) генератор прекратить выдачу б) Google для возврата частичной транскрипции. Частичная транскрипция требуется примерно каждые 10 кадров.
Чтобы лучше проиллюстрировать это, вот мой код.
внутри ресивера:
STREAMFRAMETHRESHOLD = 10
def mqttMsgCallback(self, client, userData, msg):
if msg.topic.startswith("MicSender/stream"):
msgDict = json.loads(msg.payload)
streamBytes = b64decode(msgDict['audio_data'].encode('utf-8'))
frameNum = int(msgDict['segment_num'])
if frameNum == 0:
self.asr_time_start = time.time()
self.asr.endOfStream = False
if frameNum >= 0:
self.asr.store_stream_bytes(streamBytes)
self.asr.endOfStream = False
if frameNum % STREAMFRAMETHRESHOLD == 0:
self.asr.get_intermediate_and_print()
else:
#FINAL, recieved -1
trans = self.asr.finish_stream()
self.send_message(trans)
self.frameCount=0
внутри реализации Google Speech Class:
class GoogleASR(ASR):
def __init__(self, name):
super().__init__(name)
# STREAMING
self.stream_buf = queue.Queue()
self.stream_gen = self.getGenerator(self.stream_buf)
self.endOfStream = True
self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
self.current_transcript = ''
self.numCharsPrinted = 0
def getGenerator(self, buff):
while not self.endOfStream:
# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
chunk = buff.get()
if chunk is None:
return
data = [chunk]
# Now consume whatever other data's still buffered.
while True:
try:
chunk = buff.get(block=False)
data.append(chunk)
except queue.Empty:
self.endOfStream = True
yield b''.join(data)
break
yield b''.join(data)
def store_stream_bytes(self, bytes):
self.stream_buf.put(bytes)
def get_intermediate_and_print(self):
self.get_intermediate()
def get_intermediate(self):
if self.stream_buf.qsize() > 1:
print("stream buf size: {}".format(self.stream_buf.qsize()))
responses = self.client.streaming_recognize(self.streaming_config, self.requests)
# print(responses)
try:
# Now, put the transcription responses to use.
if not self.numCharsPrinted:
self.numCharsPrinted = 0
for response in responses:
if not response.results:
continue
# The `results` list is consecutive. For streaming, we only care about
# the first result being considered, since once it's `is_final`, it
# moves on to considering the next utterance.
result = response.results[0]
if not result.alternatives:
continue
# Display the transcription of the top alternative.
self.current_transcript = result.alternatives[0].transcript
# Display interim results, but with a carriage return at the end of the
# line, so subsequent lines will overwrite them.
#
# If the previous result was longer than this one, we need to print
# some extra spaces to overwrite the previous result
overwrite_chars = ' ' * (self.numCharsPrinted - len(self.current_transcript))
sys.stdout.write(self.current_transcript + overwrite_chars + '\r')
sys.stdout.flush()
self.numCharsPrinted = len(self.current_transcript)
def finish_stream(self):
self.endOfStream = False
self.get_intermediate()
self.endOfStream = True
final_result = self.current_transcript
self.stream_buf= queue.Queue()
self.allBytes = bytearray()
self.current_transcript = ''
self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
return final_result
В настоящее время это ничего не выводит со стороны транскрипции.
stream buf size: 21
stream buf size: 41
stream buf size: 61
stream buf size: 81
stream buf size: 101
stream buf size: 121
stream buf size: 141
stream buf size: 159
Но ответ / расшифровка пустые. Если я поставлю точку останова на ответ for в ответах внутри функции get_intermediate, тогда он никогда не будет запущен, что означает, что по какой-то причине он пуст (не перенастроен из Google). Однако, если я устанавливаю точку останова на генераторе и слишком долго (> 5 секунд) продолжаю выдавать данные, он (Google) сообщает мне, что данные, вероятно, отправляются на сервер слишком медленно. google.api_core.exceptions.OutOfRange: 400 Audio data is being streamed too slow. Please stream audio data approximately at real time.
Может быть, кто-нибудь заметит здесь очевидное ...