Тестирование в Apache Beam, часть 2: Stream

Исходный код, упомянутый в статье, можно найти здесь: https://github.com/papaizaa/apache-beam-examples/tree/master/src/main/java/com/papaizaa/streaming_example

В Части 1 мы исследовали написание модульных тестов для конвейеров Apache Beam Batch. На этот раз мы рассмотрим, как писать тесты для потоковых конвейеров. Поскольку Apache Beam был разработан как единый SDK для пакетной обработки и потокового конвейера, мы должны ожидать, что код не изменится сильно. Код конвейера остается в основном тем же, но написание тестов для конвейеров потоковой передачи становится сложным, поскольку теперь вам нужно учитывать , когда поступил тестовый ввод.

Пример магистрального трубопровода

В этом примере мы собираемся запустить конвейер потоковой передачи для данных protobuf, поступающих из Cloud PubSub. Protobuf - это метод сериализации структурированных данных между различными сервисами. Cloud PubSub - это управляемая Google служба сообщений в режиме реального времени. PubSub использует Темы как ресурс, на который издатели отправляют сообщения, тогда как Подписки - это способ для подписчика получать все события, публикуемые в теме.

Мы будем получать сообщения от двух подписок: покупка продуктов и покупка книг. В этом гипотетическом примере мы говорим, что пользователи совершают много разовых покупок в режиме реального времени. Например:

  • Время: 00:00:01 Книга куплена за $ 10.00
  • Время: 00:00:03 Книга куплена за $ 5.00
  • Время: 00:00:06 Продукты, купленные за 50 долларов США

Мы хотим отслеживать покупки пользователей в режиме реального времени и получать общую сумму за каждый сеанс покупок. Например, если пользователь прекращает совершать покупки в течение 10 минут, мы завершаем сессию покупок и хотим узнать общую сумму покупок. В момент 00:00:16 мы можем сказать, что пользователь купил вещей на общую сумму 65 долларов США.

В этом руководстве мы узнаем, как работать с оконными данными в реальном времени, как присоединяться к потокам pubsub и как группировать данные в системе реального времени.

Псевдокод основного конвейера будет прост:

books := ReadPubSub(books-subscription)
groceries := ReadPubSub(groceries-subscription)
allPurchases := books JOIN groceries
allPurchases -> KeyValueSplit -> ApplyWindowFunction -> GroupByKey -> SumAllPurchases -> WriteToPubSub(output-topic)

Вы можете следовать этому примеру здесь: https://github.com/papaizaa/apache-beam-examples/tree/master/src/main/java/com/papaizaa/streaming_example

Теперь давайте переведем этот псевдокод в реальный код Beam.

Здесь есть некоторые новые концепции, которые не очевидны, если исходить из мира пакетной обработки.

Что такое окно?

Окно - это способ для Beam группировать неограниченные / потоковые данные с точки зрения времени прибытия. Данные GroupByKey легко объединить в пакетный конвейер, так как у вас уже есть все эти данные в вашей файловой системе, но когда дело доходит до данных в реальном времени, вы не можете просто собрать все данные, поскольку новые элементы постоянно поступают. Как только у нас есть данные в Окно мы можем применять преобразования к элементам в каждом окне. Следовательно, когда мы применяем GroupByKey после оконных данных, мы группируем элементы в PCollection на основе ключа и окна.

Beam предоставляет ряд функций управления окнами, которые вы можете использовать: фиксированное время, скользящее время, время сеанса и глобальное окно. Фиксированное время относится к окнам, в которых каждые X минут окно открывается, а затем закрывается. Скользящие окна похожи на фиксированные, но имеют некоторое перекрытие с предыдущим окном. Окна сеанса определяют окна, которые содержат элементы в пределах определенного промежутка времени от другого элемента. Следовательно, если есть активность в течение какого-то времени, а затем нет активности в течение X времени, мы отрезаем окно и отправляем вперед то, что у нас есть. X будет установлен глобально перед развертыванием. Глобальное окно - это вообще не окно, так как все данные будут в одном окне.

В этом руководстве мы используем Session Windows.

Window<KV<String, Events.Event>> sessionWindow =
        Window.<KV<String, Events.Event>>into(Sessions.withGapDuration(
                Duration.standardMinutes(SESSION_WINDOW_GAP_DURATION)))
                .triggering(trigger)
                .withAllowedLateness(Duration.standardSeconds(ALLOWED_LATENESS_SEC))
                .accumulatingFiredPanes();

Что такое триггер?

Beam использует триггеры, чтобы определить, когда выдавать агрегированные результаты каждого окна. По умолчанию Beam выводит агрегированный результат, когда оценивает, что все данные были получены, и отбрасывает все последующие данные для этого окна. Но есть много способов поиграть с триггерами, чтобы вы могли выполнять такие действия, как запускать каждый новый элемент в окне и возвращать что-то, когда окно закрывается.

В нашем примере триггер следующий:

Trigger trigger = Repeatedly.forever(pastFirstElementInPane()
        .orFinally(AfterWatermark.pastEndOfWindow()));

Это означает, что запускается по первому элементу в окне, для каждого нового элемента и при закрытии окна.

Одна полезная вещь, которую я обнаружил при работе с потоковыми конвейерами, заключается в том, что если вы хотите работать с разными событиями, полезно сначала объединить потоки PubSub вместе, а затем применить свои преобразования. Я сделал это с помощью функции Flatten.pCollections:

В моем случае обе подписки имеют события одного и того же формата Protobuf, но это может быть не всегда. У вас может быть одна тема для получения BookEvents, а другая - GroceryEvents. В этом случае я предлагаю использовать ParDo для преобразования сообщений protobuf в некоторый абстрактный класс «Event», создания классов для каждого отдельного события и расширения этих классов абстрактного класса Event. Затем вы можете использовать функцию Flatten.pCollections (), чтобы поместить события в один поток PCollection.

Входные данные PubSub, которые мы получим, будут в формате protobuf. Мы согласовали схему с издателем сообщений и создали следующий файл .proto. Затем мы можем сгенерировать исходные файлы Java, чтобы иметь возможность обрабатывать эти сообщения и предоставлять стандартные методы и операции Java. Чтобы узнать, как сгенерировать класс в com.papaizaa.streaming_example.generated_pb.Events.java, выполните следующие действия:

Https://developers.google.com/protocol-buffers/docs/javatutorial#compiling-your-protocol-buffers

Когда дело доходит до модульного тестирования кода Beam, ключевыми вещами для тестирования являются функции ParDo, а также их взаимодействие от одного ParDo к другому. Поэтому мы рассмотрим функции ParDo, а затем приступим к написанию тестов.

На заметку:

  1. KeyValueSplit должен показаться вам знакомым, если вы использовали Beam для пакетных конвейеров. Следует отметить, как мы обрабатываем прото-сообщение, поскольку мы проверяем, что ни одно из обязательных полей не является пустыми строками. Файлы Protobuf не устанавливают пустые значения в null, а вместо этого устанавливают соответствующие нулевые значения их примитивного типа, например: int - ›0, bool -› false и string - ›« ».
  2. CombineEvents ParDo применяется после группировки данных по ключу и окну. Следовательно, мы можем получить доступ к окну как к объекту в аргументах processElement. На основе нашего окна и стратегии запуска мы отправим сообщение, когда окно закроется. Следовательно, когда сеанс покупок завершен, мы хотим, чтобы потребитель нашего вывода знал, что этот вывод означает конец сеанса. Следующий код сообщает нам, открыто окно или нет:
c.pane().getTiming() != PaneInfo.Timing.ON_TIME

Здесь под панелью понимается окно. Если время панели - ON_TIME, это означает, что окно сеанса закрыто. Следовательно, мы устанавливаем windowOpen как false в нашем выходном объекте protobuf.

3. Когда окно заканчивается, мы устанавливаем windowEnd на время закрытия окна, а не на время последнего события. Это достигается с помощью: window.maxTimestamp (). Это время будет совпадать с временем прибытия последнего события + SessionGapDurationTime.

4. Вы можете заметить, что здесь события отсортированы по времени события. Это важно сделать, так как в системе реального времени вам не гарантируется получение событий вовремя или в правильном порядке в зависимости от времени события. Могут быть поздние события и события, которые выходят из строя. Следовательно, если мы хотим, чтобы наш выходной протокол был чистым, мы сортируем события по времени события.

Модульное тестирование в Apache Beam

Когда дело доходит до тестирования конвейеров Beam, оно очень похоже на типичное модульное тестирование, но вместо использования JUnit вам придется привыкнуть к использованию инфраструктуры тестирования Beam под названием PAssert. Как и в любом модульном тесте, вы генерируете набор входных данных и проверяете, соответствует ли ожидаемый результат фактическому результату тестируемой функции.

Этот тест не представляет ничего нового по сравнению с модульным тестированием пакетных конвейеров. Я не буду вдаваться в подробности этого теста, поскольку большая часть материала, рассмотренного здесь, уже была рассмотрена в Части 1.

Следует помнить, что нам нужно определить глобальный объект TestPipeline, который будет использоваться всеми тестами в файле. В конце каждого теста мы должны вызывать функцию run () для объекта TestPipeline.

Единственная разница здесь в том, что мы создаем java-объекты, сгенерированные protobuf. Из теста вы можете увидеть наш объект Events.Event может быть сгенерирован с использованием шаблона создания объекта builder. В этом тестовом классе мы явно тестируем все условия раннего возврата, определенные в KeyValueSplit ParDo, где, если UserID, EventTime или Price пустые / 0.0, мы не обрабатываем элемент.

Теперь давайте посмотрим на класс CombineEventsTest.

При тестировании конвейеров потоковой передачи вам необходимо ознакомиться с новым классом, который называется TestStream. Из документации Javadocs:

A testing input that generates an unbounded PCollection of elements, advancing the watermark and processing time as elements are emitted. After all of the specified elements are emitted, ceases to produce output.

Теперь эта строка документации поднимает еще один вопрос: В чем разница между водяным знаком и временем обработки?

Из руководства по программированию Beams:

However, data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals. Beam tracks a watermark, which is the system’s notion of when all data in a certain window can be expected to have arrived in the pipeline. Once the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered late data.

Насколько я понимаю, мы можем считать, что время водяного знака - это время, когда система считает, что все данные в определенном окне будут получены. Принимая во внимание, что время обработки - это фактическое время обработки события в задании потока данных. И, наконец, время события - это время, когда событие было фактически отправлено издателем. Никогда не гарантируется, что эти три времени будут одинаковыми, и мы можем смоделировать эти разные времена в наших модульных тестах.

Вот основные выводы из теста:

  1. Мы обязательно используем ту же стратегию работы с окнами и триггер, что и в нашем основном классе.
  2. Мы глобально определяем:
private Instant baseTime = new Instant(0);

Это будет базовое время всех наших тестов. Это упрощает создание событий, которые просто смещены на некоторое время по сравнению с этим базовым временем. Это время совпадает с 1970–01–01T00: 01: 00.000 UTC.

3. Здесь используется вспомогательная функция (createDummyEvents), чтобы упростить создание экземпляра объекта protobuf со смещением времени события относительно базового времени на x минут.

4. Чтобы экземпляр TestStream мог имитировать поступающие реальные события, нам нужно использовать специальный класс под названием TimeStampedValue ‹V›. Класс принимает в качестве значения любой объект и требует отметки времени, обозначающей время обработки (прибытия) события в потоке. Функцию, которую я написал для создания этих классов, можно найти здесь:

5. Чтобы добавить события TimestampedValue в TestStream, вам необходимо следовать следующей общей схеме:

testPipeline.apply(TestStream.create(Coder.of(SomeClass))
            .advanceWatermarkTo(baseTime)
            .addElements(TimestampedValue1)
            .advanceProcessingTime( > TimeStampedValue.eventTime)
            OR
            .advanceWatermarkTime(someTime)
            ....
            .advanceProcessingTime(last element + session duration)
            .advanceWatermarkToInfinity());

Давайте посмотрим, как выглядит тестовый пример.

Вы можете думать о создании TestStream как о последовательности событий; установить базовое время, отправить событие, продвинуть время обработки конвейера за время события и, наконец, продвинуть водяной знак до бесконечности. Здесь вы можете заметить, что каждый раз, когда я добавляю событие в поток, я увеличиваю время обработки до 1 минуты после времени события. Это сделано намеренно, чтобы имитировать, что мы получили события с задержкой в ​​1 минуту от издателя сообщения. Также можно создать тестовые ситуации, когда события пришли после водяного знака / времени обработки, и посмотреть, как конвейер обрабатывает поздние события. После последнего события время обработки также увеличивается на 10 минут по сравнению с предыдущими 5 минутами. Это было сделано для того, чтобы зафиксировать событие, инициированное после закрытия окна сеанса.

6. Последней частью модульного теста всегда является проверка, соответствует ли ожидаемый результат фактическому результату. Но поскольку это конвейер потоковой передачи, мы теперь должны рассмотреть ожидаемый результат и , когда он ожидается. Основываясь на наших стратегиях работы с окнами и запуска, мы должны ожидать получения выходного сообщения после каждого ввода и по окончании окна сеанса. Следовательно, нам нужно создать n + 1 ожидаемых выходных сообщений для наших n событий в TestStream.

Теперь вопрос в том, как получить результат в определенное время? Мы можем использовать класс IntervalWindow. Этот класс представляет окно с начальным и конечным временем. Затем мы используем PAssert, чтобы проверить, соответствует ли вывод в окне интервала ожидаемому результату. Одна критическая вещь, которую следует не учитывать, заключается в том, что, хотя событие было обработано на 2-й минуте, окно интервала начинается во время события. Я не верю, что это универсально верно для всех случаев при работе с водяными знаками, но с этим можно поиграться.

Кроме того, вы можете заметить, что время окончания находится на 11-й минуте. Это немного сбивает с толку, поскольку вы ожидаете, что окно с 1-й по 11-ю минуту захватит все выходные данные, но захватит только один. По непонятным мне причинам PAssert выдаст пустой результат, если ваш IntervalWindow не заканчивается на eventTime + gapDurationTime. И когда вы хотите протестировать следующий вывод с событиями в моменты времени 1 и 2, ваше окно должно начинаться с 1 и заканчиваться на 12.

IntervalWindow windowFirstEvent = new IntervalWindow(baseTime.plus(Duration.standardMinutes(1)),
        baseTime.plus(Duration.standardMinutes(11)));
PAssert.that(output).inWindow(windowFirstEvent).containsInAnyOrder(resultFirst);

Последний нюанс - как проверить вывод последнего события и событие закрытия окна. Мы используем тот же объект окна, начиная с 1-й и заканчивая 14-й минутой, поскольку время последнего события - 4-я минута. Но теперь нам нужно изменить inWindow на inEarlyPane и inFinalPane.

PAssert.that(output).inEarlyPane(windowFourthEvent).containsInAnyOrder(resultFourth);

PAssert.that(output).inFinalPane(windowFourthEvent).containsInAnyOrder(resultAfterWatermark);

inEarlyPane предоставит вам выходные данные после последнего события и до закрытия окна, а inFinalPane предоставит вам выходные данные после последнего события и после закрытия окна.

В качестве побочного примечания, если вы просто запустите конвейер и сравните вывод, как если бы это был пакетный конвейер, например:

PAssert.that(output).containsInAnyOrder(resultAfterWatermark);

Это фактически эквивалентно

PAssert.that(output).inFinalPane(windowFourthEvent).containsInAnyOrder(resultAfterWatermark);

Этот тест является хорошим примером того, как вы можете очень легко протестировать весь свой сквозной конвейер в Beam. Здесь мы протестировали всю функциональность нашего потокового конвейера, за исключением чтения и записи PubSub, которые мы абстрагируем, предполагая, что эти службы всегда работают.

Я оставлю вам тестировать поведение позднего или раннего события в конвейере, поигравшись с обработкой и манипулированием временем водяных знаков в TestStream.

Подробнее о тестировании Apache Beam Streaming можно прочитать в документации Beam: https://beam.apache.org/blog/2016/10/20/test-stream.html