Я создал поток интеграции для чтения файлов с SFTP-сервера и их обработки. Я понял, что как только возникает ошибка с одним из файлов (выбрасывается исключение), опрос останавливается и любой другой файл не обрабатывается до следующего опроса. Как этого избежать, не помечая файл как обработанный, а обрабатывая оставшиеся файлы в том опросе?
Моя конфигурация довольно проста. Я использую нетранзакционный опросчик, который запускается каждую минуту с max-message-per-poll
из 1000. SftpStreamingInboundChannelAdapterSpec
имеет max-fetch-size
из 10 и использует составной фильтр списка файлов с SftpRegexPatternFileListFilter
и SftpPersistentAcceptOnceFileListFilter
.
@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
.transform(new StreamTransformer())
.channel("processingChannel")
.get();
}
private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
.filter(fileListFilter(jdbcMetadataStore))
.maxFetchSize(10)
.remoteDirectory("/the-directory");
SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
sftpStreamingMessageSource.setFileInfoJson(false);
return sftpStreamingInboundChannelAdapterSpec;
}
private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000);
sourcePollingChannelAdapterSpec.autoStartup(true)
.poller(pollerSpec);
}
@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
String fileNameRegex = // get regex
SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
return compositeFileListFilter;
}
Прочитав этот ответ, я попытался использовать транзакционный опросник следующим образом:
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000)
.transactional(dataSourceTransactionManager);
но в результате после сбоя обработки файла опрос останавливается, все обработанные сообщения откатываются, а оставшиеся сообщения не обрабатываются до следующего опроса. Из этого ответа я понял, что каждое сообщение будет обрабатываться в отдельной транзакции.
Единственный способ, который я нашел для достижения этого до сих пор, заключался в том, чтобы окружить код обработки в блоке try/catch, перехватывающем все исключения, чтобы не прерывать опрос. В блоке catch я вручную удаляю ChannelSftp.LsEntry
из фильтра составного списка файлов. Для этого мне нужно было установить свойство fileInfoJson
на false
в SftpStreamingMessageSource
, предоставленном SftpStreamingInboundChannelAdapterSpec
.
Я нахожу этот подход довольно запутанным, и его недостатком является то, что файлы, которые выходят из строя и удаляются из фильтра, сразу после этого обрабатываются повторно, а не в следующем опросе. Я надеялся, что есть более простое решение моей проблемы.