Пошаговое руководство по расширенному программированию Kafka Streams — KTable, Time Windows и Aggregation

Обработка потока данных в реальном времени не ограничивается фильтрацией и преобразованием значений, мощная структура позволяет выполнять гораздо более сложные вычисления. Типичным примером является технический анализ фондового рынка.

Технические индикаторы интригуют, поскольку они являются полезными инструментами, дающими трейдерам ценную информацию и ценные рекомендации по важным торговым решениям.

Задача состоит в том, чтобы предоставить технический анализ немедленно с притоком цен на акции. Без сомнения, Kafka Streams — отличный выбор, когда речь идет о потоках данных со сложной логикой обработки в режиме реального времени.

В дополнение к базовой разработке Kafka Stream в моей предыдущей статье я поделюсь с вами, как построить более сложную логику для потоков данных на примерах популярных расчетов скользящей средней для цены акции.

Эта статья не предназначена для предоставления каких-либо советов по торговле акциями, примеры в этой статье предназначены исключительно для демонстрации возможностей Kafka Streams.

Что такое Кафка Стримс

Kafka Streams — это клиентская библиотека для доступа к данным в Kafka. Помимо основных операций, таких как публикация и чтение сообщений в темах Kafka, Kafka Streams поддерживает различные операции с данными, чтобы создать конвейер данных и выполнять системную логику по мере прохождения данных через конвейер.

Дизайн Kafka Streams удобен для разработчиков. Вызовы функций выполнены в плавном стиле формата DSL. Например, приведенный ниже пример кода отфильтровывает IBM по ценам акций и вычисляет медианные цены.

Это можно сделать всего за несколько строк кода, которые аккуратны и легко читаемы.

Формат данных в Kafka — важная концепция, которую нельзя игнорировать. Данные хранятся в массивах байтов в Kafka. Любые сообщения, записанные в Kafka, должны быть сериализованы из объектов данных в массивы байтов. Наоборот, данные, извлеченные из Kafka, должны быть десериализованы из байтовых массивов в объекты данных.

Поэтому важно четко указать сериализацию и десериализацию (сокращенно SerDes) для всех операций с Kafka Streams. Я покажу вам, как на примерах.

Что такое скользящая средняя?

Поскольку цены на акции могут колебаться в течение короткого периода времени, скользящее среднее является популярным инструментом для трейдеров, чтобы найти общее движение и прогнозировать тенденции.

Скользящее среднее может быть коротким периодом (например, 20 дней) и длинным периодом (200 дней). На приведенном ниже примере диаграммы показаны цены акций с краткосрочной скользящей средней и долгосрочной скользящей средней. Пересечение линий скользящих средних может служить сигналом на покупку или продажу.

Расчет этого мощного индикатора на самом деле довольно прост. Это среднее значение (сумма/количество). Он называется «перемещающимся», потому что данные основаны на движущихся окнах.

Скажем, 15-минутная скользящая средняя основана на данных за 09:00–09:15, 09:01–09:16, 09:02–09:17 и так далее.

Чтобы рассчитать скользящее среднее, вот схема потока данных:

На приведенной выше диаграмме скользящие средние рассчитываются по следующим шагам:

  1. Котировки акций группы По тиккерам
  2. Применить временное окно к группе данных
  3. Совокупные цены для получения суммы и подсчета
  4. Вычислить среднее значение с помощью суммы и подсчета
  5. Публикация средних значений полных временных окон

Давайте рассмотрим каждый шаг за шагом в следующих разделах.

Потоки данных о ценах на акции

Примеры в этой статье в основном основаны на потоках данных о ценах на акции. Представьте себе, что большой объем транзакций по торговле акциями, цены на акции быстро публикуются в Kafka в этом формате ключ-значение:

Ключ

  • Бегущая строка

Значение

  • Отметка времени
  • Открытая цена
  • Цена закрытия
  • Низкая цена
  • Высокая цена
  • Объем

Группировка потоков данных по биржевому коду

Первым шагом является подключение потока данных к теме «цена акции» для ввода цен на акции. Seralizer/Deseralizer (Serde) определяется ключом сообщения (String) и значением (JSON StockPrice).

Затем сгруппируйте сообщения по бегущим строкам, используя операцию groupByKey() в Kafka Streams.

Теперь у нас есть цены акций в группах. KGroupStream представляет собой потоки сгруппированных данных. Он предоставляет функции для консолидации данных.

Чтобы проиллюстрировать консолидацию данных, давайте рассмотрим этот простой пример для подсчета количества цен акций по тикеру:

Типичным примером является подсчет количества сообщений в группе. Он преобразует потоки данных в таблицу данных, которая является KTable в Kafka Streams. На приведенной ниже диаграмме показано преобразование KStream в KTable. Ключ сообщения KTable — это String, а значение — Long.

В то время как потоки данных выглядят как журнал изменений, в котором хранятся все исторические данные, KTable хранит только последнюю версию данных.

Применить временное окно

Группировка данных в основном охватывает все данные в теме Kafka независимо от отметки времени. Чтобы рассчитать скользящее среднее 15-минутных интервалов, KafKa Streams необходимо знать временную метку цен на акции.

По умолчанию Kafka присваивает текущую временную метку сообщению, когда оно публикуется в теме. В нашем случае мы собираемся использовать метку времени внутри значения сообщения о цене акции. Для этого мы создаем собственный экстрактор меток времени, чтобы извлекать метки времени из сообщений.

В Kafka Streams довольно просто создать собственный экстрактор временных меток. Просто создайте класс, реализующий интерфейс TimestampExtractor, получите отметку времени из значения сообщения и верните ее в миллисекундах эпохи в методе extract().

Kafka Streams поддерживает несколько типов временных окон. Скользящее окно применимо к скользящему среднему в этом примере, потому что расчет управляется событием, а не фиксированным временным окном. Обратитесь к официальной документации для объяснения.

Теперь добавьте пользовательский экстрактор временных меток в вызов функции при использовании входной темы с помощью withTimestampExtractor() и привяжите данные к временному окну с помощью windowedBy().

Мы определяем скользящие окна с интервалом в 15 минут без льготного периода. Если вы хотите обработать любое запоздалое сообщение, вместо этого используйте вызов метода с льготным периодом.

Сообщения разделены по тикеру и временным окнам. Теперь вывод будет TimeWindowedKeyStream.

Совокупные потоки цен акций в средние значения

Расчет среднего требует 2 шагов:

  1. Повторите все цены акций в группе (т. е. один и тот же тикер и временное окно), чтобы рассчитать сумму и подсчитать
  2. Преобразуйте общее количество и сумму в среднее значение

Групповой поток предлагает операцию под названием aggregate(), которая перебирает каждый элемент и выполняет системную логику в цикле for со следующими параметрами:

  1. Начальное значение с нулевым счетом и нулевой суммой
  2. Функция, которая принимает ключ и значение сообщения, а также агрегированные данные из предыдущей итерации. Логика возвращает новое приращение счетчика и обновленную сумму.

Результаты сохраняются в KTable с тикером в качестве ключа и CountAndSum DTO в качестве значения.

Второй шаг — преобразование суммы и подсчета в среднее значение с помощью mapValues(). Результаты сохраняются в другой KTable с оконным ключом (где WindowedSerdes предоставляется Kafka Streams) и средним значением в качестве значения.

Вот пример реализации:

Шаг 1 — используйте aggregate() для вычисления суммы и подсчета

Шаг 2 — используйте mapValues() для расчета средних значений

Подавить поток данных до закрытия временного окна

Использование временного окна сложно. Сообщения непрерывно проходят через конвейер. Одна и та же запись проходит более чем через одно временное окно из-за перекрытия временных окон. Промежуточные данные отправляются на выход даже тогда, когда временные окна еще не полностью заполнены.

Для этого варианта использования нам нужна только средняя цена акций в 9:15 за период времени с 9:00 до 9:15. Результаты расчета неполных временных окон следует отбросить.

Kafka Streams поддерживает это требование с помощью операции suppress(). Добавьте эти строки после mapValues(). Он указывает, что поток данных должен подавляться до тех пор, пока временное окно не будет закрыто.

Преобразование KTable обратно в KStream для вывода

KTable сохраняет только последние значения, в то время как желаемый результат представляет собой поток данных средних значений по мере продвижения временного окна. Другими словами, на выходе получается поток журналов изменений в таблице. Использование toStream() — это простой шаг для обратного преобразования таблицы в поток данных.

Временные окна с остаточными миллисекундами, такие как [09:00:00.001–09:15:00.001], создаются по некоторым причинам. Их следует исключить из вывода. Наконец, обновите ключ сообщения с оконного ключа на тикер, используя map()

Проверьте топологию скользящего среднего

Большой! Топология готова, пора проверить, работает ли она так, как ожидалось.

Тестирование по топологии невозможно без его запуска на автоматизированных тестах с данными, подавающими подготовленный набор входных данных для тестирования и ожидаемый результат.

Дизайн тестовых данных и ожидаемые результаты являются важной частью, когда дело доходит до тестирования. У нас есть 2 набора цен акций и ожидаемая скользящая средняя для тикера UHN и AMAZ соответственно. Цены на акции рассчитаны на колебания во времени.

Настройка основана на TopologyTestDriver, Test Input Topic и Test Output Topic, предоставленных Kafka Streams. Тестовые коды довольно стандартны, и более или менее тот же шаблон кода можно применить ко всем другим модульным тестам.

Вот методы настройки и демонтажа для каждого тестового сценария. Ничего особенного, просто инициализируйте тестовый драйвер, тему и топологию. Кроме того, закрывайте тестовый драйвер в конце каждого теста.

Чтобы запустить тест, введите цены акций во входную тему и утвердите сгенерированные скользящие средние в выходной теме.

Он проверяет, что выходные данные находятся в правильной последовательности, а скользящие средние для разных тикеров рассчитываются независимо.

Последние мысли

Мы прошли через разработку широко используемого технического индикатора цены палки. Формула средних значений проста, но задача состоит в том, чтобы выполнить расчеты по временным окнам для потоков данных.

Концепция двойственности потоковой таблицы, группировка потоковых данных по ключу и временным окнам являются полезными методами для реализации вычислений скользящего среднего в реальном времени.

Несмотря на скользящую среднюю, существует ряд технических индикаторов, которые необходимы для анализа цен на акции. Расчет этих показателей включает в себя сложные расчеты, и, следовательно, будут приняты более совершенные методы для реализации расчетов в реальном времени.

Давайте попробуем еще один интересный индикатор в следующей статье. Индекс относительной силы (RSI) — один из часто используемых индикаторов, который дает трейдерам сигналы на покупку/продажу. Расчет RSI более сложен, и методы, описанные в этой статье, недостаточны для выполнения расчетов.

Репозиторий Git

Вот Git Repo, если вы ищете полный исходный код: