Эту статью можно читать как минимум двумя способами:

  1. Как несколько длинное введение в удобный инструмент, который вы можете легко запустить для проверки сообщений, опубликованных в темах в кластере Kafka.
  2. В качестве подробного, но базового примера того, как объединить несколько методов для создания сквозного (push) конвейера от Kafka Topic (на сервере или в облаке) до клиентского браузера (включая: Node & node-rdkafka, Express и Server Отправленные события, DHTML и JavaScript на стороне клиента

В этой статье представлено введение в простой инструмент — приложение Node — которое предоставляет полезную функциональность и которое можно легко — очень легко — расширить дополнительными функциями. И код, предоставленный с этой статьей, содержит ряд полезных примеров использования языка Node/JavaScript и создания клиента Kafka с использованием node-rdkafka и реализации канала событий, отправленных сервером, на слайдах сервера и клиента с использованием промежуточного программного обеспечения Express.

На этом изображении показано средство наблюдения за темами Kafka: в окне браузера — с автоматическим обновлением при создании новых сообщений в теме Kafka — отображается живой список сообщений из всех тем в кластере Apache Kafka. А все исторические сообщения можно сбросить из темы в браузер. Довольно удобно во время разработки — и для мастер-классов, демонстраций и развлечений.

Приложение Node может быть запущено где угодно — на ноутбуке, в облаке, на сервере — и подключается к назначенному кластеру Kafka, который (при условии правильной настройки правил доступа к сети) может находиться где угодно, например в публичном облаке.

Исходники инструмента доступны на GitHub: https://github.com/AMIS-Services/online-meetups-introduction-of-kafka/tree/master/lab2b-topic-watcher (как часть репозитория для семинар из трех частей по Apache Kafka)

Обзор приложения Kafka Topic Watcher

Сердцем Kafka Topic Watcher является приложение Node, основным модулем которого является app.js. Этот модуль:

  • запускает HTTP-сервер (порт 3010) для обслуживания веб-приложения (несколько статических файлов — HTML, Javascript и изображения)
  • обрабатывает запросы REST от веб-приложения
  • отправляет SSE (события, отправленные сервером) в веб-приложение (используя поддержку SSE в sse.js)
  • импортирует и инициализирует модуль consume, который подключается к целевому кластеру Kafka (как указано в config.js).
  • установите обработчик сообщений с модулем consume для обработки сообщений из всех тем в кластере Kafka: function handleMessage. Эта функция подготавливает событие к отправке в браузер и использует функцию updateSseClients и модуль sse.js для отправки события SSE всем подключенным клиентам Kafka Topic Watcher.

Веб-приложение состоит из файла index.html и двух вспомогательных (на стороне клиента) файлов JavaScript: message-handler.js и topics-management.js. Первый подписывается на конечную точку SSE и обрабатывает события SSE, которые отправляются из приложения Node для каждого сообщения, получаемого из одной из тем Kafka. Сообщения собираются в массив messages, а также записываются в начало элемента topicMessagesTable HTML Table.

Текущий список тем в кластере Kafka запрашивается из приложения Node в XHR (он же запрос AJAX) из topic-management.js. Список тем, возвращаемый в асинхронном ответе, записывается на HTML-страницу на основе содержимого ответа на этот запрос.

Графический интерфейс содержит кнопку Очистить сообщения. При нажатии вызывается функция clearMessages (в message-handler.js). Он очищает массив messages и удаляет строки из таблицы topicMessagesTable.

Графический интерфейс также содержит ссылку Перечитать все темы с начала. При нажатии запускается функция config, которая, в свою очередь, отправляет запрос POST в конечную точку /config, которая впоследствии повторно инициализирует потребителя с флагом, указывающим, что все сообщения должны быть прочитаны. из всех тем, а не только из вновь прибывших сообщений; вызывается функция initializeConsumer, которая отключит текущего потребителя потока и создаст новый.

Запуск Kafka Topic Watcher

Kafka Topic Watcher почти готов к работе. Вам не нужно вносить изменения в код, чтобы получить удовольствие, за исключением того, чтобы убедиться, что файл config.js имеет правильные настройки для вашей среды: конфигурация Kafka Broker (для вашего локального кластера или, например, для облачного кластера CloudKarafka) необходимо определить.

Прежде чем вы сможете запустить приложение, вам нужно установить зависимости. Из командной строки в каталоге, содержащем файл package.json, выполните:

npm install

чтобы загрузить все необходимые модули NPM в каталог node-modules.

Теперь вы можете запустить веб-приложение:

or

HTTP-сервер запущен и прослушивает порт 3010.

В браузере откройте веб-приложение Kafka Topic Watcher: http://localhost:3010.

Вы должны увидеть список всех не внутренних тем в указанном кластере Kafka. И вы увидите все новые сообщения, созданные в этих темах. Вы можете щелкнуть ссылку Перечитать все темы с начала, чтобы просмотреть все сообщения из истории тем. В дополнение к реальным сообщениям из Kafka Topics вы также увидите пульсирующие сообщения, которые приложение Node отправляет каждые 25 секунд всем клиентам SSE.

Детали реализации

Некоторые аспекты реализации инструмента, возможно, заслуживают повторного взгляда (или извлечения кода):

  • Как создать объект JavaScript путем слияния двух других объектов
  • Как подключиться к кластеру Kafka с помощью node-rdkafka
  • Как получить список всех тем в кластере Kafka
  • Как вернуть асинхронный ответ из функции JavaScript
  • Как настроить канал Server Sent Events из Node и как получать сообщения SSE в браузере
  • Как отправлять запросы XHR (fka AJAX) и обрабатывать ответы с помощью await fetch

Как создать объект JavaScript путем слияния двух других объектов

Лучше всего посмотреть на примере:

Составьте kafkaConf, объединив externalConfig.kafkaConfig с созданным на месте объектом. Все свойства обоих объектов будут в результирующем объекте. Если оба объекта содержат одно и то же свойство, это свойство будет иметь значение, которое оно имеет во втором объекте в этой операции.

Как подключиться к кластеру Kafka с помощью node-rdkafka

Сначала объявите зависимость от node-rdkafka в package.json. Это обеспечивает мост от JavaScript через C/C++ к коммуникационному протоколу Kafka.

Соединение создается как часть источника или потребителя — на основе объекта конфигурации Kafka — и на самом деле на свойстве metadata.broker.list.

Это свойство задается конечными точками брокеров в кластере Kafka — именем хоста и портом. В зависимости от конфигурации безопасности кластера Kafka объект конфигурации может потребовать наличия свойств для обработки ограничений безопасности. Пример объекта конфигурации Kafka, который я использовал:

Как получить список всех тем в кластере Kafka

Реализация node-rdkafka предоставляет возможность как Потребителю, так и Производителю получать метаданные для кластера Kafka, проверьте эту ссылку. В этом случае я использовал объект Producer, несмотря на то, что этот инструмент на самом деле ничего не производит для Cluster.

Объект производителя создается на основе объекта kafkaConfig. После подключения обработчику события ready передается объект метаданных, который содержит среди прочего массив для всех тем в кластере. Имена тем, начинающиеся с «__», указывают на внутренние, административные темы; они отфильтровываются на этапе reduce.

Как вернуть асинхронный ответ из функции JavaScript

Функция getTopics определена как асинхронная функция. Вызывающие эту функцию, скорее всего, будут использовать await в своем вызове:

Функция getTopics получает доступ только к результату, который она вернет в (асинхронном) обработчике событий для события ready. Чтобы получить результат функции после возврата функции, мы используем промис. Функция getTopics возвращает Promise — «отложенный результат». Ожидание выполняется только тогда, когда обещание, переданное вызывающей стороне ( getStarted()), разрешается. Внутри промиса мы используем явный вызов (встроенной функции) resolve, чтобы промис выдал свой результат. Такое ощущение, что функция getTopics() передает небольшую коробку функции getStarted() с инструкцией для getStarted() продолжать ждать, пока результат не появится из коробки.

Дополнительную информацию о сочетании функций обратного вызова, обработчиков событий, промисов и async/await см. в моей предыдущей статье: https://technology.amis.nl/2020/01/11/javascript-mapping-and-wrapping-classic-callback- функции-к-обещаниям-и-асинхронному-ожиданию/.

Как эффективно и элегантно отфильтровать массив по элементам, удовлетворяющим условию: уменьшить

Массив тем, возвращаемый metadata.topics, содержит внутренние темы (те, имена которых начинаются с __). Чтобы создать массив тем без этих элементов, наиболее элегантным подходом является установка оператора reduce для исходного массива. Reduce работает с начальным значением (пустой массив в данном случае: []) и функцией, которая получает два входа: промежуточный результат (который изначально является начальным значением, здесь пустой массив) и следующий элемент массива. Ожидается, что функция выдаст следующий промежуточный результат. В этом случае промежуточным результатом является массив со всеми элементами исходного массива, удовлетворяющими условию фильтра.

Этот фрагмент можно более или менее прочитать как:

const clusterTopics = (выберите * из metadata.topics как тему, где substring(topic.name,1,2) != ‘__’)

Как настроить канал Server Sent Events из Node и как получать сообщения SSE в браузере

Клиент браузера подписывается на источник SSE — используя EventSource(url). Это действительно все, что требуется для создания канала, который позволяет серверу отправлять свои обновления для асинхронной обработки клиентом. Обработчик сообщений прикреплен к источнику события. Эта функция вызывается каждый раз, когда сервер отправляет событие подписчикам SSE. Просмотр просто получает пакет JSON и должен сделать с ним что-то полезное.

Серверная часть SSE при использовании Node и Express также не слишком сложна.

Express настроен с несколькими «перехватчиками», которые могут работать с HTTP-запросом до того, как он действительно будет передан обработчику пути и метода — очень похоже на фильтры сервлетов и сервлеты для тех из вас, у кого есть фон Java (Servlet). «Перехватчик» sseMiddleware — простой, добавляющий свойство SSE Connection к каждому ответу — независимо от того, будет он использоваться или нет. Примечание: это соединение действительно используется только в обработчике /updates.

Запрос, отправленный клиентом через EventSource(URL) по пути /updates, является запросом, результатом которого является подписка на канал SSE. Соединение настроено (и отправлено клиенту браузера) и добавлено в коллекцию sseClients, чтобы иметь возможность отправлять ему сообщения, когда на стороне сервера возникает необходимость выразить себя.

Объекты Topic и Connection определены в следующем фрагменте. Соединение — это настоящий SSE-канал — настоящее рукопожатие с браузером. Тема — это простой административный объект для отслеживания всех подключений:

Наконец, чтобы отправить сообщение с сервера всем подключенным прослушивателям SSE, можно вызвать следующую функцию. Он принимает сообщение и отправляет его всем клиентам SSE.

Как отправлять запросы XHR (fka AJAX) из браузера и обрабатывать ответы с помощью await fetch

Я отправил свой первый асинхронный запрос браузера в прошлом веке, до того, как был придуман термин AJAX и до того, как был введен объект XML HTTP Request. Я почти поверил, что изобрел эту концепцию. Один (невидимый) фрейм на веб-странице отправил запрос и был перезагружен с ответом. Верхний фрейм может получить доступ к ответу и использовать его для обновления самого себя или других фреймов. Такое ощущение, что в те дни безопасность фреймов была намного более расслабленной. В любом случае, взаимодействие из браузера с сервером в фоновом режиме значительно эволюционировало. Асинхронный синтаксис в последней версии ES (ECMA Script) сделал программирование в стиле AJAX еще менее запутанным. Реактивный через большой разрыв между браузером и сервером.

Здесь функция config() использует postData() для отправки запроса POST с телом JSON на URL-адрес. Асинхронная функция postData возвращает обещание, которое разрешается после получения ответа на запрос POST. Ответ преобразуется из JSON в структуру объекта JavaScript, которая появляется в качестве входных данных для функции then:

Еще меньше кода требуется, когда мы кодируем запрос к серверу, как следующий фрагмент. Обратите внимание, что нам нужно ожидать дважды. Операция fetch является асинхронной. Асинхронно он возвращает не ответ, а обещание ответа. Таким образом, два уровня: сначала ожидайте асинхронной отправки запроса, а затем с обещанием ответа ожидайте прибытия фактического ответа. Затем обработайте содержимое JSON ответа (разберите текст в объекты JavaScript) и в этом случае напишите список на основе элементов массива в данных.

(см., например, https://dev.to/shoupn/javascript-fetch-api-and-using-asyncawait-47mp для получения дополнительной информации)

TODO/Следующие шаги

Несмотря на то, что Kafka Topic Watcher делает то, что должен, его можно легко улучшить. Некоторые предложения, которые читатель может воспринять как вызов:

  • разрешить фильтрацию по определенным темам
  • разрешить сортировку по различным свойствам
  • улучшить внешний вид графического интерфейса с помощью стилей, шрифтов и виджетов
  • разрешить публикацию сообщений в дополнение к их использованию
  • разрешить установку целевого кластера Kafka из графического интерфейса (или даже использовать его из нескольких кластеров одновременно)

Первоначально опубликовано на https://technology.amis.nl 28 апреля 2020 г.

Примечание от JavaScript In Plain English

Мы запустили три новых издания! Проявите любовь к нашим новым публикациям, подписавшись на них: AI на простом английском, UX на простом английском, Python на простом английском — спасибо и продолжайте учиться!

Мы также всегда заинтересованы в помощи в продвижении качественного контента. Если у вас есть статья, которую вы хотели бы отправить в какое-либо из наших изданий, отправьте нам электронное письмо по адресу [email protected], указав свое имя пользователя на Medium, и мы добавим вас в качестве автора. Также сообщите нам, к каким публикациям вы хотите добавиться.