Потребители Kafka выбрасывают java.lang.OutOfMemoryError: прямая буферная память

Я использую одноузловой брокер Kafka (0.10.2) и одноузловой брокер zookeeper (3.4.9). У меня есть потребительский сервер (одноядерный и 1,5 ГБ ОЗУ). Всякий раз, когда я запускаю процесс с 5 или более потоками, потоки моих потребителей прекращаются после выдачи этих исключений.

  1. Исключение 1

java.lang.OutOfMemoryError: пространство кучи Java в java.nio.HeapByteBuffer. (HeapByteBuffer.java:57) в java.nio.ByteBuffer.allocate (ByteBuffer.java:335) в org.apache.kafka.comReceive.network. .readFromReadableChannel (NetworkReceive.java:93) на org.apache.kafka.common.network.NetworkReceive.readFrom (NetworkReceive.java:71) на org.apache.kafka.common.network.KafkaChannel.receive (KafkaChannel.java:169 ) на org.apache.kafka.common.network.KafkaChannel.read (KafkaChannel.java:150) на org.apache.kafka.common.network.Selector.pollSelectionKeys (Selector.java:355) на org.apache.kafka. common.network.Selector.poll (Selector.java:303) в org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:349) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll ( ConsumerNetworkClient.java:226) на org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWake вверх (ConsumerNetworkClient.java:263) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread.run (AbstractCoordinator.java:887)

  1. Исключение 2

Неперехваченное исключение в потоке "кафка-координатор-сердцебиение" | topic1: java.lang.OutOfMemoryError: Прямая буферная память в java.nio.Bits.reserveMemory (Bits.java:693) в java.nio.DirectByteBuffer. (DirectByteBuffer.java:123) в java.nio.ByteBuffer.allocateDirect (ByteBuffer.allocateDirect .java: 311) на sun.nio.ch.Util.getTemporaryDirectBuffer (Util.java:241) на sun.nio.ch.IOUtil.read (IOUtil.java:195) на sun.nio.ch.SocketChannelImpl.read ( SocketChannelImpl.java:380) на org.apache.kafka.common.network.PlaintextTransportLayer.read (PlaintextTransportLayer.java:110) на org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (или NetworkReceive. .apache.kafka.common.network.NetworkReceive.readFrom (NetworkReceive.java:71) по адресу org.apache.kafka.common.network.KafkaChannel.receive (KafkaChannel.java:169) по адресу org.apache.kafka.common.network .KafkaChannel.read (KafkaChannel.java:150) на org.apache.kafka.common.network.Selector.pollSelectionK eys (Selector.java:355) в org.apache.kafka.common.network.Selector.poll (Selector.java:303) в org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:349) в org .apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:226) на org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup (ConsumerNetworkClient) atfka.java:26:26 .clients.consumer.internals.AbstractCoordinator $ HeartbeatThread.run (AbstractCoordinator.java:887)

Я погуглил и использовал нижеупомянутые параметры JVM, но все же произошли те же исключения

-XX: MaxDirectMemorySize = 768 м

-Xms512m

Как решить эту проблему? Требуется ли настройка других параметров javm?

Мой потребительский код Kafka:

import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
List topicNameList
Map kafkaTopicConfigMap = new HashMap<String,Object>()
Map kafkaTopicMessageListMap = new HashMap<String,List>()
Boolean isRebalancingTriggered = false
private final Long REBALANCING_SLEEP_TIME = 1000

public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){
    logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
    logger.debug("Populating Property for kafak consumer")
    logger.debug("BatchSize {}",batchSize)
    Properties kafkaConsumerProperties = new Properties()
    kafkaConsumerProperties.put("group.id", groupName)
    kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")
    switch(serverType){
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
            break
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
            kafkaConsumerProperties.put("max.poll.records",1)
            kafkaConsumerProperties.put("max.poll.interval.ms",600000)
            kafkaConsumerProperties.put("request.timeout.ms",600005)
            break
        default :
            throw "Invalid server type"
            break
    }
    logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
    kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
    topicNameList = topicNameRegex.split(Pattern.quote('|'))
    logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
    logger.debug("{} [Constructor] Exit",TAG)
}

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG)
        isRebalancingTriggered = false
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG)
        isRebalancingTriggered = true
        publishAllKafkaTopicBatchMessages()
        commitOffset()

    }
}

private class AsyncCommitCallBack implements OffsetCommitCallback{

    @Override
    void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

    }
}

@Override
void run() {
    logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
    populateKafkaConfigMap()
    initializeKafkaTopicMessageListMap()
    String topicName
    String consumerClassName
    String consumerMethodName
    Boolean isBatchJob
    Integer batchSize = 0
    final Thread mainThread = Thread.currentThread()
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
            kafkaConsumer.wakeup()
            try {
                mainThread.join()
            } catch (InterruptedException exception) {
                logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
            }
        }
    })
    kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
    try{
        while(true){
            logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
            ConsumerRecords kafkaRecords
            if(isRebalancingTriggered == false) {
                kafkaRecords = kafkaConsumer.poll(100)
            }
            else{
                logger.error("{} in rebalancing going to sleep",TAG)
                Thread.sleep(REBALANCING_SLEEP_TIME)
                continue
            }
            for(ConsumerRecord record: kafkaRecords){
                if(isRebalancingTriggered == true){
                    break
                }
                currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                topicName = record.topic()
                DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                logger.debug("Details about Message")
                logger.debug("Thread {}",mainThread.getName())
                logger.debug("Topic {}",topicName)
                logger.debug("Partition {}",record.partition().toString())
                logger.debug("Offset {}",record.offset().toString())
                logger.debug("clasName {}",consumerClassName)
                logger.debug("methodName {}",consumerMethodName)
                logger.debug("isBatchJob {}",isBatchJob.toString())
                Object message = record.value()
                logger.debug("message {}",message.toString())
                if(isBatchJob == true){
                    prepareMessagesBatch(topicName,message)
                    //batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                    //logger.debug("batchSize {}",batchSize.toString())
                }
                else{
                    publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)
                }
                //publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                //try {
                //  kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())
                logger.debug("{} Commiting Messages to Kafka",TAG)
                //}
                /*catch(Exception exception){
                    kafkaConsumer.commitSync(currentOffsetsMap)
                    currentOffsetsMap.clear()
                    logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n"))
                }*/
            }
            commitOffset()
            publishAllKafkaTopicBatchMessages()
        }
    }
    catch(InterruptException exception){
        logger.error("{} In InterruptException",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch (WakeupException exception) {
        logger.error("{} In WakeUp Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch(Exception exception){
        exception.getMessage()
        logger.error("{} In Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    finally {
        logger.error("{} In finally commiting remaining offset ",TAG)
        publishAllKafkaTopicBatchMessages()
        //kafkaConsumer.commitSync(currentOffsetsMap)
        kafkaConsumer.close()
        logger.error("{} Exiting Consumer",TAG)
    }
}

private void commitOffset(){
    logger.debug("{} [commitOffset] Enter")
    logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())
    if(currentOffsetsMap.size() > 0) {
        kafkaConsumer.commitSync(currentOffsetsMap)
        currentOffsetsMap.clear()
    }
    logger.debug("{} [commitOffset] Exit")

}

private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
    logger.debug("{} [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true){
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    }
    else{
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    }
    logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize){
        logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    }
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}

private void populateKafkaConfigMap(){
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each { topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}

private void initializeKafkaTopicMessageListMap(){
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each { topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}

private void executeConsumerMethod(String className, String methodName, def messages){
    try{
        logger.debug("{} [executeConsumerMethod] Enter",TAG)
        logger.debug("{} [executeConsumerMethod] className  {} methodName {} messages {}",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
    } catch (Exception exception){
        logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    }
    logger.debug("{} [executeConsumerMethod] Exit",TAG)
}

private void publishAllKafkaTopicBatchMessages(){
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each { topicName, messageList ->
        if (messageList != null && messageList.size() > 0) {
            DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
            consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
            consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
            logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)
            if (messageList != null && messageList.size() > 0) {
                executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
                messageList.clear()
                kafkaTopicMessageListMap.put(topicName, messageList)

            }
        }
    }
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}

private void prepareMessagesBatch(String topicName,Object message){
    logger.debug("{} [prepareMessagesBatch] Enter",TAG)
    logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName)
    logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString())
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)

}

}


person Abhimanyu    schedule 20.03.2017    source источник
comment
Можете ли вы увидеть, сколько данных у вас ожидает / не используется? С kafka вам нужно иметь возможность буферизовать все данные в памяти. Какую частоту сообщений вы производите? Что произойдет, если вы произведете гораздо более низкую ставку.   -  person Peter Lawrey    schedule 20.03.2017
comment
@Abhimanyu, где ты можешь это решить?   -  person Ramandeep S    schedule 31.03.2021


Ответы (1)


Kafka Consumers обрабатывает накопившиеся данные с помощью следующих двух параметров:

max.poll.interval.ms
Максимальная задержка между вызовами poll () при использовании управления группами потребителей. Это устанавливает верхнюю границу количества времени, в течение которого потребитель может бездействовать, прежде чем получить дополнительные записи. Если poll () не вызывается до истечения этого тайм-аута, потребитель считается отказавшим, и группа будет перебалансирована, чтобы переназначить разделы другому члену.
Значение по умолчанию - 300000.

max.poll.records
Максимальное количество записей, возвращаемых за один вызов poll ().
Значение по умолчанию - 500.

Игнорирование установки двух вышеуказанных параметров в соответствии с требованием может привести к опросу максимального количества данных, которые потребитель может быть не в состоянии обработать с доступными ресурсами, что приведет к OutOfMemory или временам неспособности зафиксировать смещение потребителя. Следовательно, всегда рекомендуется использовать параметры max.poll.records и max.poll.interval.ms.

В вашем коде в случае KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString () отсутствуют эти два параметра, которые могут быть причиной проблемы OutOfMemory во время опроса.

person Daniccan    schedule 12.06.2017
comment
Я работаю с потоками kafka, и хотя я установил max.poll.records на 1, я все еще вижу исключение OOM. - person Kamil; 30.01.2019
comment
@Kamil Тебе удалось это понять? - person theprogrammer; 26.01.2021
comment
@Kamil, ты смог это понять? - person Ramandeep S; 31.03.2021
comment
К сожалению нет :-/ - person Kamil; 01.04.2021