как запустить весенний пакет из весенней интеграции

Как запустить пакетное задание Spring из Spring Integration, используя java dsl Integrationflows.

У меня есть приведенный ниже код, который опрашивает файл в каталоге, в тот момент, когда новый файл добавляется в каталог, генерируется сообщение, я хочу запустить пакетное задание spring в этом экземпляре. Пожалуйста посоветуй.

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .transform(Transformers.fileToString())
                     .channel(ApplicationConfiguration.INBOUND_CHANNEL)                 

             .get();
}

person Prerna    schedule 22.01.2018    source источник


Ответы (2)


В Spring Batch есть чистый образец Справочное руководство по этому вопросу:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
person Artem Bilan    schedule 22.01.2018
comment
Спасибо за ответ, я получаю сообщение об ошибке. Обработчик метода (MessageHandlerSpec‹?,? extends MessageHandler›) в типе IntegrationFlowDefinition‹IntegrationFlowBuilder› неприменим для аргументов (FileMessageToJobRequest) для handle(fileMessageToJobRequest()). Пожалуйста посоветуй. - person Prerna; 23.01.2018
comment
ХОРОШО. Попробуйте это тогда handle(fileMessageToJobRequest(), "toRequest") - person Artem Bilan; 23.01.2018
comment
Я пробовал обработать (fileMessageToJobRequest(), toRequest), он выдает. Обработчик метода (String, String) в типе IntegrationFlowDefinition‹IntegrationFlowBuilder› неприменим для аргументов (FileMessageToJobRequest, String) - person Prerna; 23.01.2018
comment
А что кидает, подскажите? - person Artem Bilan; 23.01.2018
comment
это бросает Дескриптор метода (String, String) в типе IntegrationFlowDefinition‹IntegrationFlowBuilder› не применим для аргументов (FileMessageToJobRequest, String) - person Prerna; 23.01.2018
comment
Какую версию Spring Integration Java DSL вы используете? - person Artem Bilan; 23.01.2018
comment
‹dependency› ‹groupId›org.springframework.integration‹/groupId› ‹artifactId›spring-integration-java-dsl‹/artifactId› ‹version›1.0.2.RELEASE‹/version› ‹/dependency› - person Prerna; 23.01.2018
comment
Это слишком старо. Попробуйте последнюю версию 1.2.3: github.com/spring-projects/spring-integration -java-dsl/релизы - person Artem Bilan; 23.01.2018
comment
Спасибо, это исправили. Но я получаю следующую ошибку при попытке запустить приложение: - Исключение, обнаруженное во время инициализации контекста - отмена попытки обновления: org.springframework.beans.factory.BeanCreationException: Ошибка создания bean-компонента с именем "inboundFileIntegration", определенным в ресурсе пути к классу [com /porterhead/integration/file/FilePollingIntegrationFlow.class]: не удалось создать экземпляр компонента с помощью фабричного метода; вложенным исключением является org.springframework.beans.BeanInstantiationException: не удалось создать экземпляр [org.springframework.integration.dsl.IntegrationFlow]: есть идеи, почему? - person Prerna; 23.01.2018
comment
@Bean public FileMessageToJobRequest fileMessageToJobRequest() { FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(); fileMessageToJobRequest.setFileParameterName(input.file.name); // fileMessageToJobRequest.setJob(personJob()); вернуть файлMessageToJobRequest; } , я отладил и увидел, что fileMessageToJobRequest возвращает значение null, что я должен указать в personJob ()? Есть идеи? - person Prerna; 23.01.2018
comment
Точно сказать не могу. Это должно вернуть JobLaunchRequest. См. то же справочное руководство: docs.spring.io/spring-batch/4.0.x/reference/html/ - person Artem Bilan; 23.01.2018
comment
Это исправлено, пожалуйста, дайте мне знать, когда я помещаю, скажем, 5 файлов в каталог, опросчик генерирует 5 сообщений, я хочу, чтобы весеннее пакетное задание запускалось только один раз, а не пять раз, это возможно? - person Prerna; 23.01.2018
comment
??? Задача, которую мы здесь обсуждаем, как раз касается задания на файл. Во всяком случае, это уже звучит как другой вопрос. Не так ли? Если мы исправили это, это здорово. Давайте примем ответ, чтобы помочь другим людям, и перейдем к следующей теме! - person Artem Bilan; 23.01.2018

Ниже приведен мой код: -

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource,
                                              JobLaunchingGateway jobLaunchingGateway) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .handle(fileMessageToJobRequest(),"toRequest")
                       .handle(jobLaunchingGateway)
                        .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                             .get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
  //  fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
  //  simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}
person Prerna    schedule 23.01.2018