Руководство для начинающих с примером проекта на Java

Apache Beam - это усовершенствованная унифицированная модель программирования, которая реализует задания пакетной и потоковой обработки данных, которые выполняются на любом механизме выполнения. На данный момент вы можете реализовать его на языках Java, Python и Go. Если вам нужно обрабатывать большие наборы данных или обрабатывать потоки данных, луч Apache - это инструмент, который может обрабатывать с помощью унифицированной, переносимой и расширяемой модели программирования. Вы можете получить большую гибкость и расширенные функции, необходимые для обработки данных. Есть так много бегунов, из которых вы можете выбрать, например, если вы хотите запустить все это на GCP, у вас есть Google Dataflow, который вы используете в качестве бегуна.

В этом посте мы увидим, как начать работу с Apache Beam и Spring Boot. Мы начнем с простого приложения Spring Boot и посмотрим, как интегрироваться с Apache Beam и запустить его на локальном компьютере с помощью Direct Runner.

  • Предварительные требования
  • Что такое Apache Beam
  • Концепции
  • Пример проекта
  • Реализация с помощью Spring Boot
  • Демо
  • Резюме
  • Заключение

Предварительные требования

Есть некоторые предварительные условия для этого проекта, такие как Apache Maven, Java SDK и некоторая IDE. Вам необходимо установить все это на свой компьютер, если вы хотите запустить этот пример проекта на своем компьютере.

Убедитесь, что вы установили Java и Maven на свой компьютер, протестировав эти команды. Вам нужно добавить их в свой путь, чтобы вы могли запускать эти команды.

java --version
mvn --version

Что такое Apache Beam

Как указано на их веб-сайте, Apache Beam - это продвинутая унифицированная модель программирования, которая реализует задания пакетной и потоковой обработки данных, которые выполняются на любом механизме выполнения. Beam особенно полезен для задач параллельной обработки данных, в которых задачи разделены на более мелкие пакеты данных, которые можно обрабатывать независимо или параллельно. Вы также можете использовать Beam для задач извлечения, преобразования и загрузки (ETL) и чистой интеграции данных.

Apache Beam в настоящее время поддерживает три пакета SDK: Java, Python и Go. Все эти SDK предоставляют единую модель программирования, которая принимает данные из нескольких источников. Эти источники могут быть конечным набором данных из источника пакетных данных или бесконечным набором данных из источника потоковых данных.

Вы определяете конвейер для обработки данных, бегуны конвейера Apache Beam переводят этот конвейер с вашей программой Beam в API, совместимый с сервером распределенной обработки по вашему выбору. Он поддерживает несколько бегунов, как показано на рисунке выше. Вы должны определить соответствующий Runner при запуске вашей программы Beam, чтобы Runner выполнял ваш конвейер.

Концепции

Есть так много концепций, которые вы должны знать, прежде чем пачкать руки. Здесь очень сложно пройти через каждую концепцию, вместо этого мы рассмотрим важные концепции, которые необходимы для реализации базового примера.

Трубопровод

Конвейер - это не что иное, как серия простых задач, выполняемых для получения желаемого результата. Эти задачи могут выполняться последовательно или параллельно в зависимости от необходимости. Вы можете спроектировать трубопровод несколькими способами. Давайте посмотрим на базовый конвейер: у нас есть один файл в исходной корзине, мы читаем ее и преобразуем в коллекцию под названием Pcollection (подробнее обсудим позже), а затем преобразуем и загружаем в целевую корзину.

То, что вы видите выше, - это очень простой конвейер. Вы можете спроектировать конвейер несколькими способами, такими как параллельная обработка одного и того же Pcollection, одно преобразование, производящее несколько выходных данных, и т. Д. Здесь вы можете увидеть некоторые конструкции конвейера.

Вот пример кода о том, как создать конвейер в java. Вы можете найти больше здесь, на их сайте.

PCollections

Вы создаете PCollection, считывая данные из внешнего источника с помощью Source API Beam, или вы можете создать PCollection данных, хранящихся в классе коллекции в памяти в программе вашего драйвера. Ниже приведен пример кода из документации Apache Beam, где выполняется чтение набора данных из корзины GCP. Вы можете найти больше здесь.

Трансформирует

Преобразования - это операции в вашем конвейере, которые обеспечивают общую структуру обработки. Вы предоставляете логику обработки в форме объекта функции (в просторечии называемого кодом пользователя), и ваш код пользователя применяется к каждому элементу ввода PCollection (или более чем одному PCollection). В зависимости от выбранного вами конвейера и серверной части множество разных рабочих процессов в кластере могут выполнять экземпляры вашего пользовательского кода параллельно. Вы можете найти больше здесь.

Ввод / вывод конвейера

Когда вы создаете конвейер, вам часто нужно читать данные из некоторого внешнего источника, такого как файл или база данных. Точно так же вы можете захотеть, чтобы ваш конвейер выводил свои данные результатов во внешнюю систему хранения. Вот пример чтения входных данных из корзины GCP.

PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));

Вот пример записи вывода в GCP Bucket

output.apply(TextIO.write().to("gs://some/outputData"));

У вас есть так много других концепций, которые вам следует знать, когда вы углубляетесь в модель программирования Apache Beam или работаете над производственными рабочими нагрузками. Вы можете найти больше здесь.

Пример проекта

Давайте посмотрим, что мы создаем с помощью Apache Beam и Java SDK. Вот простой файл input.txt, мы принимаем его как ввод, преобразовываем и выводим количество слов.

Как показано выше, мы разбиваем текстовый файл на основе символа «:», а затем извлекаем и подсчитываем слова, форматируем результат и выводим в файл output.txt. Этот конвейер производит следующий вывод. Вы можете увидеть три выходных файла в зависимости от процессов, запущенных на вашем компьютере.

testing: 3
progress: 2
done: 1
completed: 1
in: 2
test: 3

Вот ссылка на Github для этого примера проекта. Вы можете клонировать его и запустить на своем компьютере. Вы можете увидеть выходные файлы в этом месте / src / main / resources.

// clone the project
git clone https://github.com/bbachi/apache-beam-java-demo.git
// change the directory
cd apache-beam-java-demo
// clean and install
mvn clean install
// Run the application
java -jar target/beamdemo-0.0.1-SNAPSHOT.jar

Реализация с помощью Spring Boot

Давайте реализуем этот пример проекта шаг за шагом с помощью Java и Spring Boot. Мы начнем с создания приложения Spring Boot с помощью Spring Initializr. Как вы видите на рисунке ниже, вы можете выбрать начальные зависимости и сгенерировать проект Maven в zip-файле.

Давайте распакуем zip и загрузим приложение в IntelliJ IDE, как показано ниже.

Первый - это добавление необходимых зависимостей к приложениям весенней загрузки. Ниже приведены зависимости, которые мы должны добавить в файл pom.xml. Первый - это beam SDK, другой - Direct Runner, и мы используем версию 2.23.0.

Главный файл

Вот главный файл, который является отправной точкой вашего приложения.

Преобразование SplitWords

После того, как вы прочитали входной файл из местоположения / src / main / resources, все, что вам нужно сделать, разделить текст на «:». Это файл преобразования, который принимает входные данные и производит выходные данные. В этом случае коллекции ввода и вывода - это PCollection ‹String›.

Логика обработки, которая здесь является логикой разделения, определена в следующем файле. Это функция обработки, которая принимает каждый ввод из коллекции, обрабатывает его и производит вывод.

Преобразование CountWords

После первого разделения преобразования вам нужно преобразовать эти строки в слова. Это файл преобразования, который принимает входные данные и производит выходные данные. В данном случае входные и выходные коллекции - это PCollection ‹String› и PCollection ‹PV‹ String, Long ››

Логика обработки, которая здесь является логикой извлечения, определена в следующем файле. Это функция обработки, которая принимает каждый ввод из коллекции, обрабатывает его и производит вывод.

Ниже представлен основной файл, в котором вы создаете, определяете конвейер и запускаете его синхронно. Есть файл опций, в котором вы можете определить пути к входным и выходным файлам. Эти параметры необходимо передать в качестве аргумента при создании конвейера.

После создания конвейера вы можете применить серию преобразований с помощью функции apply. Вы можете использовать TextIO для чтения и записи в соответствующие файлы. Наконец, вы запускаете конвейер с этой строкой p.run().waitUntilFinish();.

Демо

Вы ознакомились с основами и реализацией Apache Beam SDK. Давайте запустим его и посмотрим, какие файлы вывода созданы в папке / src / main / resources /. Вот демонстрация.

Резюме

  • Apache Beam - это усовершенствованная унифицированная модель программирования, которая реализует задания пакетной и потоковой обработки данных, которые выполняются на любом механизме выполнения.
  • На данный момент вы можете реализовать его на языках Java, Python и Go.
  • Есть так много бегунов, из которых вы можете выбрать, например, если вы хотите запустить все это на GCP, у вас есть Google Dataflow, который вы используете в качестве бегуна.
  • Есть некоторые предварительные условия для этого проекта, такие как Apache Maven, Java SDK и некоторая IDE.
  • Вы определяете конвейер для обработки данных, бегуны конвейера Apache Beam переводят этот конвейер с вашей программой Beam в API, совместимый с сервером распределенной обработки по вашему выбору.
  • Он поддерживает несколько бегунов, как показано на рисунке выше. Вы должны определить соответствующий Runner при запуске вашей программы Beam, чтобы Runner выполнял ваш конвейер.
  • Подробнее о модели программирования Apache можно найти здесь.
  • Подробнее о трубопроводах можно узнать здесь.

Заключение

Это очень хороший пример для начала работы с Apache Beam. Как только вы начнете работу, вам будет легко исследовать больше самостоятельно. В будущих публикациях мы подробнее рассмотрим, как запустить конвейер с потоком данных GCP и другими концепциями, такими как схемы, окна, триггеры, метрики, состояние и таймеры и т. Д.