Как оптимизировать производительность с помощью FlatFileItemReader и асинхронных процессоров

У меня есть простой CSV-файл с ~ 400 000 строк (только один столбец). Мне требуется много времени, чтобы прочитать записи и обработать их.

процессор, проверяющий записи по диванной базе

писатель - запись в удаленную тему занимает у меня около 30 минут. это безумно.

Я читал, что flatfileItemreader не является потокобезопасным. поэтому значение моего фрагмента равно 1.

Я читал, что асинхронная обработка может помочь. но улучшений не вижу.

Это мой код:

@Configuration
@EnableBatchProcessing
public class NotificationFileProcessUploadedFileJob {


    @Value("${expected.snid.header}")
    public String snidHeader;

    @Value("${num.of.processing.chunks.per.file}")
    public int numOfProcessingChunksPerFile;

    @Autowired
    private InfrastructureConfigurationConfig infrastructureConfigurationConfig;

    private static final String OVERRIDDEN_BY_EXPRESSION = null;


    @Inject
    private JobBuilderFactory jobs;

    @Inject
    private StepBuilderFactory stepBuilderFactory;

    @Inject
    ExecutionContextPromotionListener executionContextPromotionListener;


    @Bean
    public Job processUploadedFileJob() throws Exception {
        return this.jobs.get("processUploadedFileJob").start((processSnidUploadedFileStep())).build();

    }

    @Bean
    public Step processSnidUploadedFileStep() {
        return stepBuilderFactory.get("processSnidFileStep")
                .<PushItemDTO, PushItemDTO>chunk(numOfProcessingChunksPerFile)
                .reader(snidFileReader(OVERRIDDEN_BY_EXPRESSION))
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
            //    .throttleLimit(20)
             //   .taskJobExecutor(infrastructureConfigurationConfig.taskJobExecutor())


                        //     .faultTolerant()
                        //   .skipLimit(10) //default is set to 0
                        //     .skip(MySQLIntegrityConstraintViolationException.class)
                .build();
    }

    @Inject
    ItemWriter writer;

    @Bean
    public AsyncItemWriter asyncItemWriter() {
        AsyncItemWriter asyncItemWriter=new AsyncItemWriter();
        asyncItemWriter.setDelegate(writer);
        return asyncItemWriter;
    }


    @Bean
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemStreamReader<PushItemDTO> snidFileReader(@Value("#{jobParameters[filePath]}") String filePath) {
        FlatFileItemReader<PushItemDTO> itemReader = new FlatFileItemReader<PushItemDTO>();
        itemReader.setLineMapper(snidLineMapper());
        itemReader.setLinesToSkip(1);
        itemReader.setResource(new FileSystemResource(filePath));
        return itemReader;
    }


    @Bean
    public AsyncItemProcessor asyncItemProcessor() {

        AsyncItemProcessor<PushItemDTO, PushItemDTO> asyncItemProcessor = new AsyncItemProcessor();

        asyncItemProcessor.setDelegate(processor(OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION,
                OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION));
        asyncItemProcessor.setTaskExecutor(infrastructureConfigurationConfig.taskProcessingExecutor());

        return asyncItemProcessor;

    }

    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    @Bean
    public ItemProcessor<PushItemDTO, PushItemDTO> processor(@Value("#{jobParameters[pushMessage]}") String pushMessage,
                                                             @Value("#{jobParameters[jobId]}") String jobId,
                                                             @Value("#{jobParameters[taskId]}") String taskId,
                                                             @Value("#{jobParameters[refId]}") String refId,
                                                             @Value("#{jobParameters[url]}") String url,
                                                             @Value("#{jobParameters[targetType]}") String targetType,
                                                             @Value("#{jobParameters[gameType]}") String gameType) {
        return new PushItemProcessor(pushMessage, jobId, taskId, refId, url, targetType, gameType);
    }

    @Bean
    public LineMapper<PushItemDTO> snidLineMapper() {
        DefaultLineMapper<PushItemDTO> lineMapper = new DefaultLineMapper<PushItemDTO>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(",");
        lineTokenizer.setStrict(true);
        lineTokenizer.setStrict(true);
        String[] splittedHeader = snidHeader.split(",");
        lineTokenizer.setNames(splittedHeader);
        BeanWrapperFieldSetMapper<PushItemDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<PushItemDTO>();
        fieldSetMapper.setTargetType(PushItemDTO.class);

        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(new PushItemFieldSetMapper());
        return lineMapper;
    }
}


 @Bean
    @Override
    public SimpleAsyncTaskExecutor taskProcessingExecutor() {
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setConcurrencyLimit(300);
        return simpleAsyncTaskExecutor;
    }

Как вы думаете, как я мог бы улучшить производительность обработки и сделать ее быстрее? благодарю вас

Код ItemWriter:

 @Bean
    public ItemWriter writer() {
        return new KafkaWriter();
    }


public class KafkaWriter implements ItemWriter<PushItemDTO> {


    private static final Logger logger = LoggerFactory.getLogger(KafkaWriter.class);

    @Autowired
    KafkaProducer kafkaProducer;

    @Override
    public void write(List<? extends PushItemDTO> items) throws Exception {

        for (PushItemDTO item : items) {
            try {
                logger.debug("Writing to kafka=" + item);
                sendMessageToKafka(item);
            } catch (Exception e) {
                logger.error("Error writing item=" + item.toString(), e);
            }
        }
    }

person rayman    schedule 19.02.2015    source источник
comment
Предложения по улучшению кода находятся на странице Code Review.   -  person SomethingDark    schedule 19.02.2015
comment
размер блока не имеет ничего общего с многопоточностью. Размер фрагмента 1 означает, что 1 читается, обрабатывается и записывается. Создание транзакции для каждого элемента. Это означает ~ 400000 транзакций. Увеличьте размер блока, и вы получите больше производительности. Также вы должны ограничить передачу, в идеале процессор и писатель должны совместно использовать поток, в настоящее время процессор использует поток и писатель, что означает, что вы передаете данные между потоками, которые (относительно) медленны.   -  person M. Deinum    schedule 19.02.2015
comment
Пожалуйста, проигнорируйте мой последний комментарий. Не могли бы вы ответить на этот вопрос и улучшить мой код, когда вы имели в виду: также вы должны ограничить передачу обслуживания, в идеале процессор и писатель должны совместно использовать поток. Я не понял, как можно разрешить им совместно использовать один и тот же поток. благодарю вас   -  person rayman    schedule 19.02.2015


Ответы (1)


Я бы начал с увеличения количества коммитов. Имейте в виду, что означает количество коммитов. Поскольку у вас установлено значение 1, вы делаете следующее для каждого элемента:

  1. Начать транзакцию
  2. Прочитать элемент
  3. Обработать элемент
  4. Напишите пункт
  5. Обновите репозиторий вакансий
  6. Зафиксировать транзакцию

Ваша конфигурация не показывает, что представляет собой делегат ItemWriter, поэтому я не могу сказать, но как минимум вы выполняете несколько операторов SQL для каждого элемента для обновления репозитория заданий.

Вы правы в том, что FlatFileItemReader не является потокобезопасным. Однако вы не используете несколько потоков для чтения, а только для обработки, поэтому нет причин устанавливать счетчик фиксации равным 1, насколько я вижу.

person Michael Minella    schedule 19.02.2015
comment
Спасибо за ваш ответ. Я отредактировал вопрос с писателем. Реализовал ли я асинхронное право (используя API асинхронных процессоров)? - person rayman; 20.02.2015
comment
То, что у вас есть, правильно. Однако AsyncItemProcessor/AsyncItemWriter улучшит производительность только в определенных ситуациях. В этом случае, поскольку вы читаете один файл, единственный способ повысить производительность при их использовании — это если узким местом является процессор или модуль записи. - person Michael Minella; 20.02.2015
comment
не могли бы вы привести пример того, как я должен использовать ридер, чтобы разбить большой файл на маленькие и обработать каждый из них в diff Thread? - person rayman; 20.02.2015