Как изменить порядок асинхронно отправляемых событий Kinesis в KCL

Я работаю над приложением, которое считывает и обрабатывает события из AWS Kinesis Stream с помощью клиентской библиотеки Kinesis (KCL). Я не хочу, чтобы сторона производителя событий страдала от задержки, поэтому для отправки событий использовался KinesisAsyncClient. Однако, чтобы моя обработка событий работала правильно, мне нужно обрабатывать события в «порядке, который я назвал putRecordAsync» на стороне производителя. Эта информация доступна в виде поля метки времени внутри каждой записи Kinesis.

Помимо переключения на использование блокирующего синхронного клиента Kinesis, возможно ли какое-либо другое решение для эффективной сортировки потоковых событий?


person SuSanD    schedule 05.04.2016    source источник
comment
Читатель получит события в том порядке, в котором они находятся в потоке. Вы можете контролировать порядок событий только со стороны производителя. Вы можете использовать putRecords (обратите внимание на s), который упорядочивает несколько событий. Вы также можете использовать seq-id предыдущего события, чтобы поместить за ним следующее событие. Имеет ли это смысл для вас?   -  person Guy    schedule 06.04.2016
comment
@Guy Гарантирует ли putRecordsAync использование асинхронного клиента Kinesis порядок событий?   -  person SuSanD    schedule 06.04.2016
comment
Вызов API Kinesis с помощью PutRecords (синхронный или асинхронный) сохраняет порядок событий в полезной нагрузке вызова. Если в клиентском коде выполняется дополнительная пакетная обработка для создания другого вызова API, эти события могут быть не в том порядке, в котором вы называется асинхронным в нескольких вызовах.   -  person Guy    schedule 10.04.2016


Ответы (1)


Если важен порядок, не используйте асинхронный клиент.

Асинхронный клиент просто использует пул потоков под прикрытием для выполнения одних и тех же вызовов — поскольку он многопоточный, вы не можете гарантировать порядок выполнения этих потоков, и в результате вы не можете контролировать порядок, в котором эти записи получены. Кинезис.

Теперь, если задержка действительно является проблемой для вашего производителя:

  1. Убедитесь, что вы вызываете PutRecords (вместо PutRecord), где это возможно — это, безусловно, сэкономит вам несколько сетевых циклов.

  2. Вместо прямого вызова клиента просто поместите упорядоченные записи в локальную очередь и используйте фоновый поток для регулярного опроса этой очереди для вызова PutRecords.

Следует помнить и о некоторых других вещах: если это недостаточно быстро, чтобы ваша очередь в процессе была близка к пустой, это указывает на то, что у вас достаточно большая пропускная способность данных, что вам потребуется несколько потоков, помещающих данные, и у вас больше нет точный заказ. В этом случае я настоятельно рекомендую указать порядковые номера вместе с вашими записями, чтобы при необходимости вы могли изменить их порядок на стороне потребителя (в этом случае также рассмотрите SQS в качестве отправной точки, а не Kinesis).

person Krease    schedule 08.07.2016