Введение

В этой статье мы поговорим об apache kafka — распределенной потоковой платформе с открытым исходным кодом, разработанной Apache Software Foundation. Он предназначен для обработки больших объемов данных в режиме реального времени и обеспечивает эффективную обработку, хранение и анализ потоков данных.

кафка Кластер

Кластер:

Наш кластер можно проиллюстрировать следующим образом:

Продюсер:

В Kafka производитель — это клиентское приложение, которое отвечает за публикацию данных в одной или нескольких темах Kafka. Продюсеры могут быть написаны на различных языках программирования, таких как Java, Python или Scala, с использованием клиентских библиотек Kafka.

Потребитель:

В Kafka потребитель — это клиентское приложение, которое отвечает за чтение данных из одной или нескольких тем Kafka. Потребители могут быть написаны на различных языках программирования, таких как Java, Python или Scala, с использованием клиентских библиотек Kafka.
Потребители также могут быть организованы в группы потребителей, где каждая группа состоит из одного или нескольких потребителей, которые разделяют рабочую нагрузку по потреблению. сообщения из набора разделов. Kafka гарантирует, что каждый раздел используется только одним потребителем в группе, что обеспечивает балансировку нагрузки и отказоустойчивость.

Брокер:

В Kafka брокер — это сервер, который отвечает за хранение тем и разделов Kafka и управление ими. Брокеры получают сообщения от производителей и передают их потребителям, а также реплицируют сообщения в кластере брокеров для обеспечения отказоустойчивости и масштабируемости.
Тема – это категория или имя канала, в котором производители могут писать сообщения, а потребители могут подписываться на них. читать сообщения. Тема делится на один или несколько разделов, где каждый раздел представляет собой упорядоченную неизменную последовательность сообщений.

Коэффициент репликации
Коэффициент репликации в apache Kafka — это количество реплик (копий) раздела, которые поддерживаются несколькими брокерами в кластере. Более высокий коэффициент репликации обеспечивает лучшую отказоустойчивость и надежность, поскольку гарантирует доступность данных даже в случае сбоя одного или нескольких посредников или дисков.

Демо

Ради этой демонстрации мы собираемся использовать готовый файл компоновки докеров, чтобы вызвать кластер apache kafka с 1 брокером и с apache zookeeper.

version: "3"
services:
 zookeeper:
 image: confluentinc/cp-zookeeper:7.3.2
 container_name: zookeeper
 environment:
 ZOOKEEPER_CLIENT_PORT: 2181
 ZOOKEEPER_TICK_TIME: 2000
 broker:
 image: confluentinc/cp-kafka:7.3.2
 container_name: broker
 ports:
 - "9092:9092"
 depends_on:
 - zookeeper
 environment: 
 KAFKA_BROKER_ID: 1
 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Этот файл можно найти в confluent

апачский зоопарк:

Apache ZooKeeper — это служба распределенной координации, которая часто используется с Apache Kafka для управления и координации кластера брокеров. ZooKeeper предоставляет высокодоступный и отказоустойчивый способ хранения и управления конфигурационной информацией, именами, синхронизацией и другими общими данными между узлами в распределенной системе.
Итак, начнем с выполнения следующей команды: вызов брокера и контейнеры для зоопарка:

docker compose up -d

Наши контейнеры готовы:

Теперь пишем коды производителя и потребителя:

import data
import kafka
import json
import time
def data_to_json():
 return json.dumps(data.data_faker()).encode("utf-8")
producer=kafka.KafkaProducer(bootstrap_servers=['localhost:9092'])
def produce():
 while 1==1:
 print("here")
 producer.send("registred_user",json.dumps(data.data_faker()).encode())
 time.sleep(3)
produce()
import kafka
consumer=kafka.KafkaConsumer('registred_user', group_id='my_favorite_group',bootstrap_servers=["localhost:9092"])
 
def consume():
 for msg in consumer:
 print(msg.value)
consume()

Теперь мы запускаем наши скрипты, производитель слева и потребитель справа:

и вуаля, каждое сообщение, сгенерированное производителем, читается потребителем в той же теме.

Было ли это полезно? Сбивает с толку? Если у вас есть какие-либо вопросы, не стесняйтесь комментировать ниже! Не забудьте подписаться на Linkedin: https://www.linkedin.com/in/malekzaag/ и на github: https://github.com/Malek-Zaag, если вы заинтересованы в подобном контенте и хотите продолжай учиться вместе со мной!