В Tapjoy аналитика является основой нашей платформы. В среднем в день мы обрабатываем более 2 миллионов сообщений в минуту с помощью нашего аналитического конвейера. Эти сообщения генерируются различными пользовательскими событиями на нашей платформе и в конечном итоге объединяются для представления системы в реальном времени.

В нашем технологическом стеке мы используем Kafka для хранения сообщений, Spark для потребления и агрегирования данных и Postgres для хранения агрегатов. Чтобы своевременно и точно доставлять данные, мы стремились создать систему, которая была бы масштабируемой, высокодоступной, производительной и рентабельной.

В этом посте мы рассмотрим, как мы обрабатывали семантику «хотя бы один раз» нашего конвейера Kafka посредством дедупликации в реальном времени, чтобы гарантировать целостность / точность данных.

Эта проблема

Системы обмена сообщениями, как правило, предоставляют потребителям один из двух типов семантики доставки: как минимум один раз и максимум один раз. Семантика "не более одного раза" означает, что сообщения могут быть потеряны; Семантика хотя бы один раз означает, что сообщения могут дублироваться. Для критически важных систем потеря данных недопустима. Например, если мы сообщаем о конверсиях рекламы, которые наши партнеры будут использовать для принятия решений о расходах, наши данные должны быть точными.

В Tapjoy каждая система обмена сообщениями, которую мы используем, обеспечивает гарантию доставки хотя бы один раз: SQS, RabbitMQ, Kafka и т. Д. Однако мы хотим обрабатывать сообщения * ровно один раз *, чтобы избежать завышенной аналитики. Возникает вопрос: если одно и то же сообщение будет записано в Kafka дважды, как вы можете гарантировать, что сообщение обрабатывается только один раз кодом нашего клиентского приложения?

Здесь в игру вступает дедупликация.

Требования

Учитывая масштабность конвейера и зависимость нисходящего потока от точных данных, любое решение для дедупликации должно быть:

  • Масштабируемость - это более или менее само собой разумеющееся - нам нужно иметь возможность масштабировать это решение, поскольку мы продолжаем отслеживать больше аналитических данных или в периоды высокой пропускной способности, когда наш продукт запускается у крупного партнера.
  • Высокая доступность. Большая часть нашей инфраструктуры предназначена для обеспечения высокой доступности, поэтому мы можем выдерживать сбои и поддерживать работу платформы. Учитывая ограничения в реальном времени этой системы, мы хотим иметь возможность быстро переключиться на другой кластер / набор серверов в случае, например, если зона доступности в AWS выйдет из строя.
  • Постоянный. Нам нужно, чтобы хранилище данных с дедупликацией сохранялось даже при перезапуске других служб в стеке (например, Spark). Если мы теряем данные при каждом перезапуске, мы увеличиваем вероятность дублирования при каждом развертывании.
  • Быстро - 2 миллиона сообщений в минуту - это много для дедупликации, и мы не хотим попадать в ситуацию, когда наш Spark ETL не успевает.
  • Рентабельность. Нам необходимо внимательно следить за расходами, особенно при расширении масштабов решения. Конечно, мы могли бы решить добавить 15 серверов для обеспечения высокой производительности, но с экономической точки зрения это неприемлемо.
  • Эффективное хранилище. Мы хотим защитить себя от возможности того, что мы будем получать одно и то же сообщение снова и снова, если производитель сообщений сломался. Мы поставили перед собой цель - возможность дедупликации за период от 12 до 24 часов. Это означает, что если сообщение приходит в 08:00, мы должны иметь возможность обнаруживать любые повторяющиеся сообщения, приходящие к 20:00. Чтобы хранить данные в широком диапазоне времени, но при этом снизить затраты на хранение, нам нужен способ удалить старые данные из хранилища дедупликации. Мы будем искать технологию, поддерживающую выселение из числа недавно использованных или LRU.

Подход

На высоком уровне подход, который мы рассматриваем, не сильно отличается от стандартного алгоритма дедупликации. Для каждого сообщения:

  1. Получите идентификатор транзакции сообщения (идентификатор, который идентифицирует произошедшее событие)
  2. Выполните атомарную команду «установить, если еще не установлено» для идентификатора транзакции в нашем хранилище данных.
  3. Если ключ был успешно установлен, обработайте сообщение; в противном случае это дубликат, поэтому игнорируйте

Это подход, который мы используем во многих местах, и это общая стратегия, которую мы используем здесь.

В Spark задания ETL выполняются в несколько этапов. В любой момент на этапе может произойти сбой и его необходимо повторить, или задание может быть прервано, а приложение необходимо перезапустить. В результате, если задание умирает * после * того, как мы отметили идентификаторы транзакций как просмотренные, нам нужно убедиться, что задание способно обработать это же сообщение при повторном запуске.

Чтобы решить эту проблему, наш процесс дедупликации изменен на следующее:

  1. Получите идентификатор транзакции сообщения (идентификатор, который идентифицирует произошедшее событие)
  2. Попытайтесь выполнить команду «установить, если еще не установлено», где ключ - это идентификатор транзакции, а значение * - это раздел Kafka + смещение сообщения * (это важная часть).
  3. Если ключ был успешно установлен, обработайте сообщение. Если ключ уже существует и значение соответствует текущему разделу Kafka + смещение, обработайте его как повторную попытку из-за предыдущего сбоя; в противном случае это дубликат, так что игнорируйте.

Раздел Kafka + смещение - это то, что мы называем «идентификатором владельца» в нашей системе - это, по сути, то, какое сообщение в Kafka владеет идентификатором обрабатываемой транзакции. Это позволяет выполнять задания ETL несколько раз при возникновении сбоев.

Выбор технологии

Хотя ранее мы использовали Memcached для обнаружения дублирующихся сообщений в других системах, таких как SQS, вместо этого мы остановились на Redis / Elasticache для этой системы. Аргументация была такой:

  1. Масштабируемость. Подобно Memcached, Redis можно масштабировать путем сегментирования ваших ключей на нескольких серверах. Поскольку оба ограничены доступной памятью в одном блоке, единственный способ масштабирования для поддержки большего количества ключей - это добавление дополнительных серверов.
  2. Доступность: Elasticache (и Redis) поддерживает высокую доступность за счет репликации и автоматического переключения при отказе. Amazon Elasticache предоставляет конечную точку DNS и автоматически переключается на резервную копию, если основной сервер Redis выходит из строя. Это немного чище, чем Memcached, где мы должны писать на несколько серверов от клиента, чтобы достичь высокой доступности. Это добавляет немного больше сложности, чем то, что мы предпочли бы для клиента.
  3. Постоянство: поскольку Redis может хранить данные как в памяти, так и на диске, перезапуск процесса позволяет избежать потери данных. С другой стороны, Memcached теряет все данные при перезапуске. Мы столкнемся с тем же ограничением, если будем хранить данные в процессе обработки в Spark.
  4. Быстрое / дешевое / эффективное хранилище. Это во многом зависит от того, как клиент использует технологию, независимо от того, Redis это, Memcached или что-то еще. Однако Redis API обеспечивает большую гибкость в том, как выполняются команды, что приводит к гораздо большей производительности. Подробности оптимизации Redis рассмотрены позже в этом посте.

Были рассмотрены другие решения, такие как хэш-карта с отображением памяти в процессе Spark ETL или использование нашего кластера Postgresql. Однако каждому из них не хватало в разных областях, начиная от сложности масштабирования / обеспечения высокой доступности до обеспечения эффективной производительности и хранения данных.

Учитывая все обстоятельства, Redis оказался наиболее подходящим для наших нужд.

Оптимизация эффективности хранения

Как упоминалось ранее, нам нужно было решение, которое позволяло бы обнаруживать повторяющиеся сообщения с разницей в 12–24 часа. При 2 миллионах сообщений в минуту память быстро становится ценным ресурсом. В результате для нас было важно хранить данные наиболее эффективным образом, чтобы максимально использовать доступную память в Redis.

Наивная реализация

В случае аналитического конвейера Tapjoy идентификаторы транзакций сообщений являются UUID и сопоставляются с разделом Kafka + смещением, когда он был впервые обнаружен (например, раздел 3, смещение 81418110694).

В наивной реализации ключи UUID составляют 36 байтов (32 символа с 4 тире), а значения, которые представляют идентификатор владельца Kafka, представляют собой строку с разделителями-запятыми, например «1,64942845052» (13 байтов). При приблизительно 64-байтовых накладных расходах на ключ в Redis это оставит объем памяти, необходимый для 24 часов данных, следующим образом:

(overhead + key_bytes + value_bytes) * keys_per_minute * 60 * 24
(64 + 36 + 13) * 2,000,000 * 60 * 24 = 300GB

Это будет дорого стоить, чтобы иметь такую ​​память, так что давайте посмотрим, сможем ли мы добиться большего…

Ключевые / Ценностная эффективность

Чтобы получить первое улучшение эффективности памяти, рассмотрим лучший способ представления хранимых ключей / значений.

Под капотом UUID фактически состоят из 16 байтов информации. Если мы сможем взять эти 16 байтов и преобразовать их в их двоичное представление, это приведет к сокращению использования памяти только ключевыми данными более чем на 50%. Это означало бы, что вместо представления UUID как «ce059644–18a0–4f27-bc2b-c2a2d4d4e7bf» мы могли бы представить его как «\ xbf @ \ xd4 \ x91V & IG \ x9f5 \ x9a \ xf9 \ x16K \ x9b \ xc8».

Этот перевод можно сделать так:

val uuid = java.util.UUID.fromString(“ce059644–18a0–4f27-bc2b-c2a2d4d4e7bf”)
val hi = uuid.getMostSignificantBits
val lo = uuid.getLeastSignificantBits
ByteBuffer.allocate(16).putLong(hi).putLong(lo).array
// => Array(-50, 5, -106, 68, 24, -96, 79, 39, -68, 43, -62, -94, -44, -44, -25, -65)

Используя это как ссылку, мы можем использовать аналогичный алгоритм для представления идентификатора владельца. Мы знаем, что идентификатор владельца представляет собой комбинацию идентификатора раздела и идентификатора смещения. В результате возникает вопрос: какое наименьшее количество байтов нам нужно для представления обоих этих значений?

Разделы представлены в Kafka целыми числами (4 байта), но максимальное количество разделов составляет всего 6. Если мы предположим, что мы никогда не перейдем к 32 767 разделам для одной темы Kafka, то мы можем представить раздел в 2 байта (2¹⁵ - 1).

Смещения представлены в Kafka как длинные (8 байтов), но единственное требование, которое у нас есть, это то, что мы не видим одно и то же значение смещения внутри раздела в течение 24 часов. Причина, по которой у нас есть это требование, заключается в том, что мы намереваемся хранить в Redis только идентификаторы сообщений на срок до 24 часов. Следовательно, если мы в конечном итоге эффективно повторно используем значения смещения, это нормально, если это происходит за пределами этого 24-часового окна.

Учитывая вышеизложенное, если мы нацелены на 6 байтов для представления смещения Kafka, это означает, что мы будем переносить каждое (2⁴⁷ - 1) сообщение (281 474 976 710 656 сообщений). Это намного больше того, что мы обычно видим в течение 24 часов (мы видим около 2 млрд сообщений).

Это означает, что наш идентификатор владельца фактически рассчитывается следующим образом:

ByteBuffer.allocate(2).putShort(partition.toShort).array ++ ByteBuffer.allocate(8).putLong(offset).array.slice(2, 8)

Конечным результатом является то, что вместо 13 байтов для представления раздела 1 / смещения 64942845052, требуется только 8 байтов.

Учитывая все это, новый расчет:

(per_key_overhead + key_bytes + value_bytes) * keys_per_minute * 1440
(64 + 16 + 8) * 2,000,000 * 60 * 24 = 230GB

Отлично, мы сэкономили 70 ГБ, но объем памяти все еще довольно велик! Посмотрим, сможем ли мы сделать что-нибудь лучше ...

Уменьшить количество ключей

На этом этапе единственное, что мы потенциально можем сделать, чтобы уменьшить объем требуемой памяти, - это уменьшить количество ключей, которые мы фактически храним. На первый взгляд это кажется невозможным - если нам нужно хранить 2B уникальных идентификаторов сообщений в Redis, то как можно уменьшить количество ключей? Здесь в игру вступают хеш-операции Redis (например, HSET, HGET, HSETNX и т. Д.).

Наше первоначальное вдохновение для использования типа данных Redis Hash для уменьшения общего количества хранимых ключей Redis 1-го уровня пришло из сообщения в блоге группы инженеров Instagram ([ранее размещено здесь] (http: //instagram-engineering.tumblr. com / post / 12202313862 / хранение сотен миллионов простых ключей и значений )). Под капотом Redis попытается использовать структуру данных [ziplist] (http://download.redis.io/redis-stable/src/ziplist.c) для хранения ключей / значений для хэшей. Эта структура значительно более эффективна с точки зрения памяти, чем использование обычных ключей Redis, если у вас нет разных требований к TTL для каждого ключа.

Чтобы использовать Redis Hash-ключи, нам действительно нужно создавать блоки идентификаторов сообщений. В нашем случае мы можем группировать сообщения, используя временную метку внутри сообщения. Благодаря нашему собственному тестированию и собственным результатам Instagram мы видим убывающую отдачу от корзин, содержащих более 1000 ключей. Мы также не хотим превышать значение [hash-max-zipmap-entries] (https://redis.io/topics/memory-optimization), поскольку данные больше не будут храниться в виде ziplist. При оценке использования памяти мы обнаружили, что идеальным вариантом является хранение от 100 до 1000 ключей на ведро. Это дает нам возможность расти, не оказывая слишком большого влияния на потребление памяти.

Если мы нацелены на 100 ключей на ведро, следующая задача - выяснить, сколько сегментов нам понадобится. Этот расчет таков:

buckets = keys_per_minute / keys_per_bucket
buckets_per_min = 2,000,000 / 100 = 20,000
buckets_per_sec = 20,000 / 60 = 333

Теперь мы можем запустить наш новый расчет того, сколько памяти будет потреблено:

((per_key_overhead + key_bytes + value_bytes) * keys_per_minute) * 1440 +
((per_bucket_overhead + bucket_key_bytes) * buckets_per_minute * 1440
((1 + 16 + 8) * 2,000,000) * 60 * 24 +
((64 + 14) * 20,000 * 60 * 24
= (72,000,000,000 + 2,246,400,000) = 69GB

В приведенной выше формуле:

  • per_bucket_overhead такое же, как per_key_overhead, использованное в предыдущих расчетах.
  • per_key_overhead - это количество дополнительных байтов, которые требуется типу данных Redis Hash для хранения ключа.
  • Предполагается, что ключ сегмента составляет 14 байтов (комбинация отметки времени сообщения + 3-значного фрагмента сегмента для целей сегментирования).

С этим изменением мы получаем самую большую экономию - теперь мы используем 30% от предыдущего использования!

Оптимизация производительности

Хорошо, теперь, когда у нас есть наша технология и мы знаем, как мы храним данные, следующая часть - выяснить, как мы это делаем быстро.

Напомним, мы обрабатываем 2 миллиона сообщений в минуту в нашем кластере Spark. По сравнению со всем остальным, что происходит в нашем ETL каждую минуту, мы должны убедиться, что процесс дедупликации не добавляет слишком много времени нашим заданиям. Если дедупликация была слишком дорогой, наш ETL мог начать отставать.

Для нашей системы мы хотели настроить таргетинг на общее время в 1 секунду, которое будет потрачено на создание запросов для Redis, хранение данных и удаление дубликатов каждую минуту.

Наивная реализация

Несмотря на то, что мы использовали вышеописанный подход с разделением на сегменты, нам все равно нужно написать 2M ключей для Redis. Наивная реализация здесь - отправить Redis 2 миллиона команд HSETNX и HGET (см. HSETNX). Например:

message_ids.each do |message_id, owner_id|
 client.HSETNX(message_id, owner_id) # claim owner
 client.HGET(message_id) # determine winner
end

Как вы понимаете, это довольно медленно. Для этого необходимо, чтобы Redis обрабатывал 4 миллиона отдельных команд, а клиент - 4 миллиона ответов. Даже если вам пришлось разделить сегменты на несколько серверов Redis и распараллелить команды, производительность будет неприемлемой.

Для 2M сообщений, разделенных на 3 экземпляра Redis, требуется около 26 секунд. Намного больше, чем мы пытаемся достичь.

Конвейерная реализация

Redis предлагает конвейерную обработку, которая позволяет Redis обрабатывать команды, даже когда клиент читает результаты. Как указано в документации Redis, это может очень хорошо повлиять на вашу производительность. Например:

client.pipeline do |pipeline|
 message_ids.each do |message_id, owner_id|
 pipeline.HSETNX(message_id, owner_id) # claim owner
 pipeline.HGET(message_id) # determine winner
 end
end

Если мы воспользуемся тем же подходом, что и выше, но с командами, отправленными через конвейер, потребуется около 3 секунд для обработки тех же 2M сообщений. Вполне нормально! - о 8-кратном улучшении производительности, хотя все еще выше нашей цели.

Сценарии Lua

Redis поддерживает сценарии Lua, что позволяет нам запускать логику как можно ближе к данным (в самом процессе Redis!) И контролировать, что на самом деле возвращается клиенту. Причины, по которым сценарии Lua могут принести нам пользу, заключаются в следующем:

  1. Приближаясь к данным, мы можем уменьшить как количество циклов обмена с клиентом, так и количество чтений в Redis. Вызывая HSETNX несколько раз в одном сегменте, Redis вынужден несколько раз прочитать и проанализировать содержимое ключа Hash. Если мы сможем ограничить это одной командой, мы сможем сократить время обработки.
  2. Контролируя результаты, мы можем уменьшить объем данных, отправляемых обратно клиенту, возвращая только идентификаторы сообщений, которые были обнаружены как дублированные. В настоящее время мы передаем клиенту все идентификаторы сообщений и владельцев. Это создает большую нагрузку на клиента. В общем случае мы действительно не получаем никаких дубликатов, поэтому нет необходимости возвращать все эти данные клиенту.

Скрипт dedupe lua, на который мы остановились, в итоге выглядел так:

HMGET(bucket, message_ids)
message_ids.each do
 if existing owner matches or not set
 not dupe
 else
 dupe
 end
end
HMSET(bucket, new_message_ids_and_values)
return dupes

Переключившись на сценарий Lua, мы сокращаем необходимое время до 1,4 с - примерно настолько близко, насколько мы собираемся добраться до нашей цели!

Масштабирование

Обновление: это расширено, чтобы объяснить нашу потребность в сегментировании.

Пока что мы создали решение, которое удовлетворяет наши сегодняшние потребности - отлично! На этом этапе возникает вопрос: как спроектировать это решение, чтобы мы могли в 10 раз увеличивать объем трафика, который мы обрабатываем сегодня? Вместо 2 миллионов сообщений в минуту, как мы можем удовлетворить наши требования к памяти и производительности при обработке 20 миллионов сообщений в минуту? Здесь в игру вступает шардинг.

Масштабирование памяти

Сначала поговорим о требованиях к памяти. Если мы предположим, что 10-кратный объем трафика примерно эквивалентен 10-кратному использованию памяти в течение 24 часов, мы внезапно заговорим о хранении данных объемом 700 ГБ в Redis. Поскольку мы используем AWS, у самого большого из возможных экземпляров, доступных в Elasticache, всего 237 ГБ. Это означает, что нам нужен способ хранить идентификаторы сообщений в нескольких экземплярах Redis - и всегда иметь одни и те же идентификаторы сообщений, хранящиеся в одном экземпляре Redis.

В то же время нам не обязательно увеличивать объем до 237 ГБ за раз. Вместо этого мы хотим увеличивать емкость по мере необходимости - что означает использование серверов меньшего размера. Это помогает нам максимально снизить расходы. В решении для сегментирования, просто добавляя новые кластеры Redis для увеличения объема памяти, мы фактически создали безграничную емкость для хранения сообщений.

Масштабирование производительности

По мере масштабирования будут некоторые проблемы с производительностью. Если мы увеличим объем трафика до 10x, мы ожидаем, что время обработки всех этих сообщений также будет 10x (14 секунд вместо 1,4 секунды!). Для удовлетворения наших потребностей в реальном времени это недопустимо.

Наивное решение - просто распараллелить запросы к нашему кластеру Redis. Однако здесь есть большая проблема: Redis является однопоточным. Это означает, что в любой момент Redis может обрабатывать только один запрос за раз. Чтобы по-настоящему распараллелить запросы к Redis и получить от этого преимущества в производительности, нам необходимо выполнять запросы в нескольких кластерах Redis.

Решение для шардинга

Так как же на самом деле выглядит такое решение для сегментирования? Скажем, например, мы собираемся использовать 3 сервера Redis и 300 ключей корзины в секунду. В зависимости от содержимого сообщения нам нужно будет выбрать сегмент экземпляра / сегмента Redis для хранения идентификатора сообщения. Чтобы выбрать осколок, вы обычно используете функцию хеширования. Например:

Если количество шардов изменится, то функция хеширования начнет хранить сообщения в разных шардах, чем мы были раньше.

Чтобы учесть этот сценарий, у нас фактически есть концепция «предыдущей» конфигурации Redis и «текущей» конфигурации Redis. «Предыдущие» конфигурации применяются ко всем сообщениям с отметкой времени до некоторого максимального времени. Любые сообщения с временем транзакции * после * этого максимального времени начнут использовать «текущую» (новую) конфигурацию. Это позволяет нам эффективно переходить от одного кластера к другому или от одного алгоритма хеширования к другому без потери каких-либо данных, которые мы сохранили за предыдущие часы.

Последние мысли

Масштабное дублирование - сложная проблема, и, безусловно, есть и другие эффективные решения, применяемые в других отраслях отрасли. Фактически, даже сотрудники Kafka работают над стандартным решением. Обычно вы надеетесь, что либо ваши действия идемпотентны, либо точность не так важна, чтобы такая сложная система была необходима. В нашем случае это было важно для обеспечения целостности нашего аналитического конвейера.

Надеюсь, этот пост послужит полезным примером решения сложной инженерной проблемы («О, нет, у меня есть огромный поток данных, и я не могу отследить, видел ли я это раньше!») Сознательно. способ.