Spring SFTP Outbound Adapter — определение, когда файлы были отправлены

У меня есть выходной адаптер Spring SFTP, который я запускаю через adapter.start() в моей основной программе. После запуска адаптер передает и загружает все файлы в указанный каталог, как и ожидалось. Но я хочу остановить адаптер после того, как все файлы будут переданы. Как определить, все ли файлы были перенесены, чтобы я мог выполнить команду adapter.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),
            e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}

person rcaschultz    schedule 01.04.2021    source источник


Ответы (2)


@Артем Билан уже дал ответ. Но вот конкретная реализация того, что он сказал - для тех, кто является нубом Spring Integration, таким как я:

  1. Определите службу для получения PDF-файлов по запросу:
@Service
public class MyFileService {
    public List<File> getPdfFiles(final String srcDir) {
        File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
        return Arrays.asList(files == null ? new File[]{} : files);
    }
}
  1. Определите шлюз для запуска потока загрузки SFTP по требованию:
@MessagingGateway
public interface SFtpOutboundGateway {
    @Gateway(requestChannel = "sftpOutboundFlow.input")
    void uploadFiles(List<File> files);
}
  1. Определите поток интеграции для загрузки файлов на SFTP-сервер через Sftp.outboundGateway:
@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
    // could be also bound via @Value 
    private String sftpRemoteDirectory = "/path/to/remote/dir";

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22222);
        factory.setUser("client1");
        factory.setPassword("password123");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
        return e -> e
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
                .handle(
                    Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
                );
    }

    @Bean
    public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
        RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
        template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
        template.setAutoCreateDirectory(true);
        template.afterPropertiesSet();
        template.setUseTemporaryFileName(false);
        return template;
    }
}

Подключение:

public class SpringApp {
    public static void main(String[] args) {
        final MyFileService fileService = ctx.getBean(MyFileService.class);
        final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
        // trigger the sftp upload flow manually - only once
        sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
    }
}

Примечания к импорту:

1.

@Gateway(requestChannel = sftpOutboundFlow.input) void uploadFiles(список файлов);

Здесь канал DirectChannel sftpOutboundFlow.input будет использоваться для передачи сообщения с полезной нагрузкой (= List<File> files) получателю. Если этот канал еще не создан, шлюз создаст его неявно.

2.

@Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate‹ChannelSftp.LsEntry› remoteFileTemplate) { ... }

Поскольку IntegrationFlow — это Consumer функциональный интерфейс, мы можем немного упростить процесс, используя IntegrationFlowDefinition. На этапе регистрации компонента IntegrationFlowBeanPostProcessor преобразует этот встроенный (лямбда) поток интеграции в стандартный поток интеграции и обрабатывает его компоненты. Определение IntegrationFlow с использованием Lambda заполняет DirectChannel как inputChannel потока и регистрируется в контексте приложения как bean-компонент с именем sftpOutboundFlow.input в приведенном выше примере (имя bean-компонента потока + .input). Вот почему мы используем это имя для шлюза SFtpOutboundGateway.

Ссылка: https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

3.

@Bean public RemoteFileTemplate‹ChannelSftp.LsEntry› remoteFileTemplate(SessionFactory‹ChannelSftp.LsEntry› outboundSftpSessionFactory) {}

см.: Удаленный каталог для исходящего шлюза sftp с DSL

Блок-схема:

введите здесь описание изображения

person Kenan Güler    schedule 02.04.2021
comment
Поскольку ответ уже принят, я бы не стал его трогать. Сноска: ctx в функции main() относится к ConfigurableApplicationContext ctx = SpringApplication.run(SpringApp.class, args); - person Kenan Güler; 02.04.2021

Но я хочу остановить адаптер после того, как все файлы будут переданы.

Логически это не то, для чего этот вид компонента был разработан. Поскольку у вас не будет постоянно меняющегося локального каталога, возможно, лучше подумать о решении даже с драйвером для вывода списка файлов в каталоге с помощью какого-либо действия. Да, это может быть вызов из главной, но только один раз на все содержимое дир и все.

И по этой причине Sftp.outboundGateway() с Command.MPUT для вас:

https://docs.spring.io/spring-integration/reference/html/sftp.html#using-the-mput-command.

Вы по-прежнему можете вызвать IntegrationFlow, но он может начинаться с интерфейса @MessagingGateway, который будет вызываться из main с локальным каталогом, чтобы перечислить файлы для загрузки:

https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-gateway

person Artem Bilan    schedule 01.04.2021