У Kafka есть одна прекрасная функция (помимо многих других, конечно) отслеживания позиции каждого потребителя, и он фантастически достигает этого с помощью Offsets. Цель этой статьи — рассмотреть ниже различные способы совершения смещений в kafka.

  1. Автоматическая фиксация
  2. Синхронная фиксация вручную
  3. Асинхронная фиксация вручную
  4. Ручная фиксация определенных смещений

Что такое смещение кафки?:

Смещение kafka — это уникальный идентификатор для каждого сообщения в разделе kafka. Это помогает потребителям отслеживать свой прогресс, например, сколько сообщений каждый потребитель уже получил из раздела и с чего начать дальше. Обратите внимание, смещение уникально только внутри раздела, а не между разделами.

Выше приведен пример графического изображения темы kafka с двумя разделами, где числа от 0 до 5 являются смещениями для каждого сообщения в каждом разделе. Последнее зафиксированное смещение сообщения для потребителя для раздела 0 равно 4, а для раздела 1 — 3. Таким образом, если потребитель выйдет из строя или будет заменен новым, последнее зафиксированное смещение сообщения поможет ему возобновить работу именно с той точки, где оно было отправлено. был оставлен.

Потребитель может выбрать периодическую автоматическую фиксацию смещений или ручную фиксацию для особых случаев использования.

1. Автоматическая фиксация:

Это самый простой способ зафиксировать смещения, просто установив для свойства enable.auto.commit значение true. В этом случае потребительский клиент kafka будет автоматически фиксировать наибольшее смещение, возвращаемое методом poll(), каждые 5 секунд. Мы можем установить свойство auto.commit.interval.ms, чтобы изменить этот интервал по умолчанию в 5 секунд. Пример фрагмента кода ниже с включенной автоматической фиксацией с интервалом в 1 секунду.

public static void main(String[] args) {

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");

    KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
    consumer.subscribe(Arrays.asList("myavrotopic"));

    while (true) {
        ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<Integer, AvroMessage> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
        }
    }
}

Предупреждение об автоматической фиксации. Если автоматическая фиксация включена, клиент-потребитель kafka всегда будет фиксировать последнее смещение, возвращенное методом опроса, даже если они не были обработаны. Например, если опрос возвращает сообщения со смещением от 0 до 1000, а потребитель может обработать только до 500 из них и происходит сбой после интервала автофиксации. В следующий раз, когда он возобновится, он увидит смещение последней фиксации как 1000 и начнет с 1001. Таким образом, он потерял смещения сообщений от 501 до 1000. Следовательно, при автоматической фиксации важно убедиться, что мы обрабатываем все возвращенные смещения. последним методом опроса перед его повторным вызовом. Иногда автоматическая фиксация может также привести к дублированию обработки сообщений в случае сбоя потребителя до следующего интервала автоматической фиксации. Следовательно, kafka Consumer предоставляет API-интерфейсы для разработчиков, чтобы они могли взять под свой контроль, когда фиксировать смещения, а не полагаться на автоматическую фиксацию, установив для enable.auto.commit значение false, которое мы обсудим. следующий.

2. Ручная синхронная фиксация:

В приведенном ниже фрагменте кода для автоматической фиксации установлено значение false, и мы явно вызываем consumer.commitSync(), который фиксирует последнее смещение, возвращенное методом опроса. Следовательно, нам нужно убедиться, что мы вызываем его после того, как закончим обработку всех смещений сообщений, возвращенных последним методом опроса.

public static void main(String[] args) {

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");

    KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
    consumer.subscribe(Arrays.asList("myavrotopic"));

    while (true) {
        ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<Integer, AvroMessage> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
        }

        try{
            consumer.commitSync();
        }catch (CommitFailedException e){
            System.out.println("Commit failed due to : "+ e);
            e.printStackTrace();
        }

Одним из недостатков этой синхронной фиксации является то, что она может повлиять на пропускную способность приложения, поскольку приложение блокируется до тех пор, пока брокер не ответит на запрос фиксации. Следовательно, у нас есть еще один вариант использования асинхронного API, упомянутый ниже.

3. Ручная асинхронная фиксация:

С API асинхронной фиксации мы просто отправляем запрос на фиксацию и продолжаем. Здесь приложение не блокируется из-за асинхронного характера вызова.

public static void main(String[] args) {

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");

    KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
    consumer.subscribe(Arrays.asList("myavrotopic"));

    while (true) {
        ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<Integer, AvroMessage> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
        }

        consumer.commitAsync();
    }
}

Еще одним отличием асинхронной фиксации от синхронной является то, что синхронная фиксация продолжает повторяться до тех пор, пока не возникает фатальной ошибки, в то время как асинхронная не повторяется даже в случае сбоя, в противном случае это может привести к дублированию обработки. В случае сбоев асинхронность полагается на следующую фиксацию, чтобы покрыть ее.

Сочетание синхронной и асинхронной фиксации:

Обычно хорошей практикой программирования является использование как синхронных, так и асинхронных коммитов, пример кода ниже. Здесь мы используем commitAsync() во время обработки внутри цикла while. И мы используем commitSync() перед закрытием потребителя, чтобы убедиться, что последнее смещение всегда зафиксировано.

public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");

        KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
        consumer.subscribe(Arrays.asList("myavrotopic"));

        try{
            while (true) {
                ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<Integer, AvroMessage> record : records) {
                    System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
                }

                consumer.commitAsync();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                consumer.commitSync();
            }finally {
                consumer.close();
            }
            
        }
    }

4. Ручная фиксация для определенных смещений:

Все методы, которые мы обсуждали до сих пор, фиксируют последнее смещение, возвращенное последним методом poll(). Что делать, если разработчик хочет получить дополнительный контроль над ним и чаще совершать коммиты меньшими партиями, чем то, что было возвращено опросом. В таком случае мы можем передать карту раздела для смещения метаданных как в API commitSync(), так и в API commitAsync(). Пример фрагмента кода приведен ниже, где мы делаем коммит после обработки каждого сообщения.

public static void main(String[] args) {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");

    KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
    consumer.subscribe(Arrays.asList("myavrotopic"));
    Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();

    while (true) {
        ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<Integer, AvroMessage> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
            offsetAndMetadataMap.put(new TopicPartition(record.topic(), record.partition()), 
                    new OffsetAndMetadata(record.offset()));
            consumer.commitSync(offsetAndMetadataMap);
        }
    }
}

Также оформить заказ: https://medium.com/@rramiz.rraza

Я ценю вас и время, которое вы потратили на чтение этого дня! Пожалуйста, следите (подпишитесь и подпишитесь) на другие блоги о больших данных и других новейших технологиях. Здоровья!