Как обрабатывать оставшиеся файлы в опросе входящего адаптера SFTP после возникновения исключения?

Я создал поток интеграции для чтения файлов с SFTP-сервера и их обработки. Я понял, что как только возникает ошибка с одним из файлов (выбрасывается исключение), опрос останавливается и любой другой файл не обрабатывается до следующего опроса. Как этого избежать, не помечая файл как обработанный, а обрабатывая оставшиеся файлы в том опросе?

Моя конфигурация довольно проста. Я использую нетранзакционный опросчик, который запускается каждую минуту с max-message-per-poll из 1000. SftpStreamingInboundChannelAdapterSpec имеет max-fetch-size из 10 и использует составной фильтр списка файлов с SftpRegexPatternFileListFilter и SftpPersistentAcceptOnceFileListFilter.

@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
    return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
                .transform(new StreamTransformer())
                .channel("processingChannel")
                .get();
}

private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
    SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
                .filter(fileListFilter(jdbcMetadataStore))
                .maxFetchSize(10)
                .remoteDirectory("/the-directory");
    SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
    sftpStreamingMessageSource.setFileInfoJson(false);

    return sftpStreamingInboundChannelAdapterSpec;
}

private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
    PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
                .maxMessagesPerPoll(1000);
    sourcePollingChannelAdapterSpec.autoStartup(true)
                .poller(pollerSpec);
}

@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
    String fileNameRegex = // get regex

    SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
    SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");

    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
    compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
    compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);

    return compositeFileListFilter;
}

Прочитав этот ответ, я попытался использовать транзакционный опросник следующим образом:

PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
    .maxMessagesPerPoll(1000)
    .transactional(dataSourceTransactionManager);

но в результате после сбоя обработки файла опрос останавливается, все обработанные сообщения откатываются, а оставшиеся сообщения не обрабатываются до следующего опроса. Из этого ответа я понял, что каждое сообщение будет обрабатываться в отдельной транзакции.

Единственный способ, который я нашел для достижения этого до сих пор, заключался в том, чтобы окружить код обработки в блоке try/catch, перехватывающем все исключения, чтобы не прерывать опрос. В блоке catch я вручную удаляю ChannelSftp.LsEntry из фильтра составного списка файлов. Для этого мне нужно было установить свойство fileInfoJson на false в SftpStreamingMessageSource, предоставленном SftpStreamingInboundChannelAdapterSpec.

Я нахожу этот подход довольно запутанным, и его недостатком является то, что файлы, которые выходят из строя и удаляются из фильтра, сразу после этого обрабатываются повторно, а не в следующем опросе. Я надеялся, что есть более простое решение моей проблемы.


person Javier Estévez    schedule 11.06.2019    source источник
comment
Было бы лучше, если бы вы могли показать код, но ваша идея перехватить исключение для плохой записи, обработать его и двигаться дальше кажется разумной. Я не понимаю, как это запутанно.   -  person CryptoFool    schedule 11.06.2019
comment
Я добавил некоторый код с конфигурацией. Я хотел бы избежать, если это возможно, необходимости вручную удалять файлы из фильтра. Я ожидал, что транзакционный опросник сделает это за меня. Но похоже, что все сообщения откатываются, а не только то, которое вызвало проблему.   -  person Javier Estévez    schedule 11.06.2019


Ответы (1)


Решение с помощью try...catch — правильный путь. На самом деле это тот факт, что исключение, выброшенное из процесса, передается в поллер и останавливает текущий цикл вокруг maxMessagesPerPoll:

private Runnable createPoller() {
    return () ->
            this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }
            });
}

Где этот pollForMessage() выглядит так:

private Message<?> pollForMessage() {
    try {
        return this.pollingTask.call();
    }
    catch (Exception e) {
        if (e instanceof MessagingException) {
            throw (MessagingException) e;
        }
        else {
            Message<?> failedMessage = null;
            if (this.transactionSynchronizationFactory != null) {
                Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
                if (resource instanceof IntegrationResourceHolder) {
                    failedMessage = ((IntegrationResourceHolder) resource).getMessage();
                }
            }
            throw new MessagingException(failedMessage, e); // NOSONAR (null failedMessage)
        }
    }
    finally {
        if (this.transactionSynchronizationFactory != null) {
            Object resource = getResourceToBind();
            if (TransactionSynchronizationManager.hasResource(resource)) {
                TransactionSynchronizationManager.unbindResource(resource);
            }
        }
    }
}

В любом случае есть способ изолировать одно сообщение от других в одном цикле опроса. Для этого вам нужно заглянуть в цепочку советов по обработчику запросов и исследовать решение с помощью ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/current/reference/html/#message-handler-advice-chain

Таким образом, вы добавляете это в конечную точку вашего обработчика ниже по течению и перехватываете там исключения и выполняете некоторую конкретную обработку ошибок, не перебрасывая исключения в опросчик.

person Artem Bilan    schedule 11.06.2019
comment
Спасибо за быстрый ответ! Я уже пытался использовать совет для метода, аннотированного @ServiceActivator, который, я думаю, более или менее эквивалентен тому, что вы упомянули. Тем не менее, если я хочу снова обработать тот же файл в следующем опросе, мне нужно вручную удалить его из фильтра списка файлов, верно? Но если я это сделаю, то файл будет немедленно повторно обработан, пока не будет обработано max-message-per-poll сообщений (потенциально 999 раз). Есть ли способ обрабатывать файл только один раз за опрос? - person Javier Estévez; 11.06.2019
comment
Почему вы должны удалить его оттуда? Почему бы не попробовать повторить попытку для этого @ServiceActivator? Это звучит так, как будто вы все равно хотите повторить попытку, так зачем откладывать это на опрос, если у вас уже есть файл и вы можете повторить попытку в единственном конкретном месте, подобном этому @ServiceActivator. - person Artem Bilan; 11.06.2019
comment
Моя идея заключалась в том, чтобы никогда не отмечать файл как обработанный, если возникает непредвиденная ошибка даже после нескольких повторных попыток, надеясь, что любая возникшая временная ошибка будет исправлена ​​​​следующим опросом (-ами). В противном случае я не вижу, как я могу переработать файл позже. - person Javier Estévez; 11.06.2019
comment
Вы можете использовать Delayer, чтобы действительно отложить этот файл для удаления из фильтра. Таким образом, он больше не появится в текущем цикле опроса. - person Artem Bilan; 11.06.2019
comment
Спасибо за предложение, Delayer сделал свое дело. Однако жаль, что я не могу сохранить отложенное сообщение в хранилище сообщений, поскольку оно имеет несериализуемый LsEntry в качестве заголовка. Думаю, для этого мне понадобятся мои собственные фильтры списка файлов... - person Javier Estévez; 14.06.2019