Советы по решению вашей проблемной области, связанной с большими данными, с помощью инструментов, созданных для этой задачи.

Мой трехлетний путь от инженера-программиста, специализирующегося на высокопроизводительных сервисах Java, до инженера данных, владеющего инструментами для работы с большими данными, привел меня к перепроектированию моей системы, отказу от DynamoDB и переходу на Spark Streaming.

Проблемная область

Мы принимаем сотни миллионов событий в день в наш конвейер данных. Эти события представляют действия пользователей в облаке и имеют формат JSON. Основой нашей потоковой технологии является Apache Kafka, поэтому события извлекаются из облака в тему Kafka в начале конвейера. Предполагается, что они должны пройти через цепочку обогащений состояний для конкретных пользователей и компаний. Например, событие входа в систему Джона Доу из Wonderland Inc. должно быть обогащено состоянием, которое мы предварительно вычислили для Джона и рабочего места Джона. Состояния представляют собой огромные CSV-файлы, которые ежедневно пересчитываются в ходе работы, созданной нашей группой по обработке и анализу данных и сохраненной в S3. Поэтому главная архитектурная задача — обогатить каждое событие соответствующим ему состоянием.

Оригинальный конвейер

Поскольку команда была сформирована из разработчиков, специализирующихся на Java, основная служба, содержащая бизнес-логику для обработки событий, их обогащения и принятия решения о том, как с ними действовать, была написана на Java. Использование CSV-файлов в S3 для обогащения было нецелесообразным, поскольку мы обрабатывали более 10 000 событий в секунду и нуждались в быстром поиске значений ключа для каждого обработанного события.

Файлы состояния содержали строки для каждого пользователя и строки для компании, и когда событие было получено, нам нужно было искать определенные строки в этих огромных CSV-файлах. К сожалению, файлы CSV были слишком большими, чтобы поместиться в память модулей Java. Мы говорим о десятках ГБ данных, поскольку эти состояния хранят много информации на пользователя.

Мы решили выбрать DynamoDB в качестве нашего хранилища ключей и значений. DynamoDB предлагает время поиска всего в несколько миллисекунд, и это можно даже улучшить, используя DAX поверх DynamoDB в качестве более быстрого уровня кэширования в памяти. Мы создали службу ETL (в том числе и на Java), которая будет считывать только что созданные файлы состояний из S3, разбивать их на записи для каждого пользователя и заполнять несколько таблиц DynamoDB записями, ключами которых является уникальное имя пользователя. После этого в нашей основной службе приема мы могли бы быстро обогатить каждое событие, используя несколько поисковых запросов в DynamoDB. Основным сервисом приема был микросервис с несколькими модулями, поэтому мы автоматически масштабировали его в соответствии с задержкой в ​​теме событий Kafka.

Проблемы с исходным конвейером

Прежде чем описывать предостережения, важно сказать, что это решение работало и отвечало требованиям к продукту. Он также был очень масштабируемым, поскольку таблицы DynamoDB были спроектированы правильно, что теоретически позволяло нам масштабироваться до чтения миллионов записей в секунду. Записи были разделены по имени пользователя, поэтому у нас не было проблем с горячими разделами, а значит, и реалистичного барьера масштабирования. Теперь давайте перейдем к разделу проблем.

Запись, чтение и хранение больших объемов данных в DynamoDB обходятся дорого. Дорогой термин относительный; когда мы называем что-то «дорогим», мы должны сравнивать это с другим решением. Возвращаясь к тому времени, когда мы внедрили это решение, мы подумали, что оно дешевое, так как мы не имели в виду наше более новое решение.

Небольшое подробное описание цены и ее технических последствий — при работе с Dynamo вы должны выбрать, будете ли вы работать в режиме по запросу или в режиме подготовки. Режим по запросу позволит вам записывать и читать с любой пропускной способностью, но будет дороже, чем режим Provisioned.

В режиме Provisioned вам необходимо предварительно настроить скорость чтения и записи, и вы будете платить в соответствии с тем, что вы Provisioned, но у вас также может быть режим Provisioned с автоматическим масштабированием, в котором вы должны объявить минимальные и максимальные пороговые значения, а Dynamo будет медленно увеличивать или уменьшать масштаб в зависимости от использования. Режим Provisioned дешевле, и именно поэтому мы выбрали его, но он сделал наш код намного более громоздким для настройки скоростей и обработки исключений превышения порога.

Когда служба ETL ежедневно записывала в Dynamo, она начинала с программного увеличения подготовленной емкости записи, затем записывала все данные, а когда заканчивала, снижала предоставленную емкость записи обратно до нуля. Случайные проблемы, такие как исключения, которые не были правильно перехвачены, или даже сетевой поток, который зависал в системе, приводил к зависанию выделенной емкости записи и приводил к большому счету от AWS.

Что касается чтения, нам в основном приходилось правильно реализовывать обработку исключений и механизм экспоненциальной отсрочки повторных попыток, когда обработка события встречала исключение Dynamo, превышающее пороговое значение. Более ранние версии нашего кода, которые не были оптимальными, приводили к отправке событий в очередь недоставленных сообщений или ненужным задержкам при ожидании от 30 секунд до 1 минуты, пока Dynamo масштабируется до новой скорости чтения.

Что касается хранилища, нам пришлось довольствоваться сохранением до семи итераций данных для каждого пользователя. Каждый день создавалась одна итерация данных, поэтому мы могли легко «путешествовать во времени» до семи дней назад в истории, если это необходимо. В тех случаях, когда нам нужно было пойти еще раньше, нам приходилось вручную запускать службу ETL для перезагрузки исторических данных в Dynamo. О, и давайте не будем забывать о максимальном размере элемента в 400 КБ в DynamoDB, который заставил нас несколько раз перепроектировать нашу схему данных, чтобы обрабатывать пользователей с гораздо большим состоянием, чем в среднем.

Мы провели много спринтов и реализовали вышеперечисленное, но это еще не все. Служба Java ETL была службой с одним модулем, поэтому загрузка ежедневного состояния в Dynamo становилась все медленнее по мере того, как мы подключали больше клиентов. После 20 минут обработки раз в день мы уже достигли 2-3 часов ETLинга и начали волноваться. Стоимость решения также начала расти. Теперь вместо десяти модулей Java для обработки всего трафика у нас было 50 модулей, выполняющих в пять раз больше операций чтения, чем раньше. Решение, которое раньше стоило 2000 долларов в месяц, вдруг стало стоить нам более 10 000 долларов в месяц, и финансовый отдел начал дуть нам в шею из-за постоянно растущих счетов за AWS.

В нашу пользу работало то, что до тех пор, пока система не достигла этого уровня, мы уже изучили многие технологии больших данных, такие как Athena, Glue, Spark, EMR и другие. Мы поняли, что можем заменить большинство наших компонентов более простым и дешевым решением — Spark Streaming.

Новый трубопровод

Вернемся к проблемам, которые заставили нас выбрать оригинальное решение:

  • Файлы CSV состояния были слишком большими, чтобы поместиться в память службы загрузки Java.
  • Нам нужно было быстро обогатить каждое событие конкретным пользователем и состоянием компании.

Эти вызовы — хлеб с маслом Spark. Spark Streaming будет считывать микропакеты событий из темы Kafka. Затем, поручив ему присоединиться к пакету событий с состоянием S3 на основе имени пользователя, Spark ловко перетасует данные между несколькими своими рабочими процессами. Он будет загружать, кэшировать и выгружать данные состояния в свою рабочую память по мере необходимости. Задача, которую мы изо всех сил пытались решить и за которую заплатили большие деньги, была простой задачей для Spark.

Наши системы размещены на AWS. С помощью EMR мы получаем управляемую версию Spark, поэтому нам не нужно самим управлять кластером Spark. Новое решение устраняет необходимость в службе ETL Java, службе Ingest Java и DynamoDB. Это сократило наши ежемесячные расходы примерно на 85 %, а внедрение заняло примерно 10 % времени по сравнению с исходным вариантом. Мы были поражены.

Краткое содержание

Мы усвоили важный урок о больших данных. Всякий раз, когда вы сталкиваетесь с проблемной областью, связанной с большими данными, вы должны сначала проверить существующие инструменты, созданные для этой задачи. Наш опыт разработки Java в режиме реального времени интуитивно привел нас к решению, которое работало, но было громоздким, дорогим и медленным в реализации по сравнению с использованием существующего инструмента, который уже решал ту же проблему.