Повышение производительности прослушивателей Kafka в весеннем загрузочном приложении, развернутом в PCF.

Я работаю над вариантом использования, который должен обрабатывать почти 600 сообщений в секунду (подписаться на тему, преобразовать, сохранить в таблицу SQL Server и вернуться к теме), но мы обрабатываем только 100 сообщений в секунду на 5 экземпляров. мы не можем увеличить количество экземпляров для достижения этой цели. Любые предложения будут полезны?

Технологии и инфраструктура: приложение весенней загрузки с прослушивателями Kafka (без пакетного прослушивания), развернутыми в PCF. источник и вне темы каждый с 10 разделами каждый. Используются свойства и настройки по умолчанию. Преобразование занимает доли миллисекунд.


person reddy    schedule 24.07.2020    source источник
comment
Вам нужно профилировать приложение, чтобы увидеть узкое место; наверное база данных? Поскольку у вас есть 2 раздела на экземпляр, вы можете попробовать увеличить параллелизм до 2, чтобы каждый раздел обрабатывался в другом потоке.   -  person Gary Russell    schedule 24.07.2020


Ответы (1)


У меня был аналогичный вариант использования, я улучшаю производительность, добавляя параллелизм (10) для каждого слушателя и увеличивая разделы в очереди со следующей конфигурацией.

@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
    exec.setCorePoolSize(poolSize);
    exec.setMaxPoolSize(poolMaxSize);
    exec.setKeepAliveSeconds(keepAlive);
    return exec;
}

@Bean
public ConsumerFactory<String, Request> consumerFactory() {
    DefaultKafkaConsumerFactory<String, Request> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
    consumerFactory.setKeyDeserializer(new StringDeserializer());
    consumerFactory.setValueDeserializer(new JsonDeserializer<>(Request.class));
    return consumerFactory;
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> kafkaListenerContainerFactory(
        ThreadPoolTaskExecutor messageProcessorExecutor,
        ConsumerFactory<String, Request> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
    return factory;
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}
person solujan    schedule 27.07.2020