Несколько параллельных вызовов AWS Lambda

Я пытаюсь выполнить несколько вызовов AWS Lambda с пакетом python 3.7.2 и aiobotocore. Вот мой код.

import asyncio
import aiobotocore


async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)


def generate_invocations(payloads, session):
    for payload in payloads:
        yield invoke(payload, session)


def invoke_all(payloads):
    loop = asyncio.get_event_loop()

    async def wrapped():
        session = aiobotocore.get_session(loop=loop)
        invocations = generate_invocations(payloads, session)
        return await asyncio.gather(*invocations)

    return loop.run_until_complete(wrapped())


def main():
    payloads_list = []  # MY PAYLOADS LIST 
    lambda_responses = invoke_all(payloads_list)
    print(lambda_responses)


if __name__ == '__main__':
    main()

Код работает очень быстро (для 10 полезных нагрузок около 1 секунды вместо 15 при использовании вызовов лямбда-клиента boto3), но у меня есть две проблемы:

1) Элементы в lambda_responses включают ключ «Полезная нагрузка», значение которого имеет тип aiobotocore.response.StreamingBody. Когда я пытаюсь использовать value.read(), я получаю «объект сопрограммы StreamingBody.read», и я думаю, что в моем коде есть какая-то проблема. Я могу получить желаемый ответ с помощью «json.loads(json.loads(r['Payload']._buffer.pop())['body'])», но как правильно его получить.

2) В редких случаях "Полезная нагрузка" в одном из ответов имеет пустой буфер. Как я могу гарантировать, что функция invoke_all возвращает непустые ответы? Правильно ли использовать aiobotocore?

Я новичок в Python 3 и асинхронных функциях. Вдохновленный примерами документации aiobotocore и Мэтью Маркусом блог.

Спасибо!


person maksim    schedule 26.02.2019    source источник


Ответы (1)


  1. Элементы в lambda_responses включают ключ «Полезная нагрузка», значение которого имеет тип aiobotocore.response.StreamingBody. Когда я пытаюсь использовать value.read(), я получаю объект сопрограммы StreamingBody.read

Это означает, что сопрограмма read() предназначена для ожидания, что вы должны сделать, пока еще находитесь в цикле событий. Например, вы можете изменить сопрограмму invoke так, чтобы она также читала ответ:

async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
        payload = await resp['Payload'].read()
        return payload  # or assemble a dict with relevant parts
  1. В редких случаях полезная нагрузка в одном из ответов имеет пустой буфер.

Вероятно, это связано с тем, что вы обращаетесь к буферу до фактического чтения содержимого. В некоторых случаях информация поступает достаточно быстро, чтобы вы все равно нашли ее во внутреннем буфере, но иногда ее приходится ждать. Использование общедоступного метода, такого как read(), гарантирует правильное использование API. Свойство _buffer, с другой стороны, начинается со знака подчеркивания, который означает, что это деталь реализации, не предназначенная для прямого доступа.

person user4815162342    schedule 26.02.2019
comment
Спасибо! Работает отлично. - person maksim; 27.02.2019