Как запустить подчиненные приложения Spring Batch, использующие разбиение?

Мы используем разбиение Spring Batch для параллельной обработки нескольких входных файлов на двух JVM. На одной JVM работает один ведущий и один подчиненный, а на другой JVM работает еще один подчиненный.

Запуск ведущего и ведомого устройств на первой JVM выполняется путем запуска приложения загрузки Spring с передачей имени задания, как при запуске любого другого пакетного задания.

Мы запускаем ведомое устройство на второй JVM, запуская загрузочное приложение Spring, передавая имя фиктивного задания. У подчиненного устройства нет конфигурации задания, у него есть только входящий поток для получения сообщения, stepExecutionRequestHandler и код шага.

Результат: Все подчиненные компоненты успешно инициализированы, подчиненный потребитель получает сообщение и запускает stepExecutionRequestHandler, который не может создать соединение с БД без каких-либо ошибок. Если я добавлю конфигурацию задания в ведомое устройство и запущу задание, передавая правильное имя задания, проблема не возникает, что заставляет меня думать, что проблема может быть связана с тем, что не запущено настоящее задание Spring Batch, которое должно инициализировать некоторые необходимые Ресурсы. Я проверил, что bean-компоненты datasourceConfiguration и datasource были инициализированы, что делается как часть отдельного модуля.

Поэтому мне интересно, правильно ли я запускаю рабов или есть лучший способ запустить их.

Вот конфигурация слейва:


  /*
   * Configure inbound flow (requests coming from the master)
   */

  @Bean
  public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
  }

  @Bean
  public StepLocator stepLocator() {
    BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
    beanFactoryStepLocator.setBeanFactory(beanFactory);

    return beanFactoryStepLocator;
  }

 @Bean
  @ServiceActivator(inputChannel = "inboundRequests")
  public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
  }

  @Bean
  public MessageChannel inboundRequests() {
    return new DirectChannel();
  }

  @Bean
  public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
                                           @Qualifier("inboundRequests") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
  }

  @Bean
  public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames(this.requestQueue);
    container.setPrefetchCount(1);

    return container;
  }

combineReleaseJobNormalStep CODE ... 

Вот конфигурация мастера:


  @Bean
  public Job combineReleaseJob() throws Exception {
    return jobBuilderFactory.get("CombineReleaseJob")
      .incrementer(new RunIdIncrementer())
      .listener(resourceLoader)
      .listener(combineReleaseJobJobContextPreparer())
      .flow(combineReleaseJobCL31401())
      .from(combineReleaseJobCL31401()).on("N").to(combineReleaseJobNormalStepManager())
      .from(combineReleaseJobCL31401()).on("R").end()
      .from(combineReleaseJobNormalStepManager()).on("COMPLETED").to(combineReleaseJobAddressTableCheck())
      .from(combineReleaseJobNormalStepManager()).on("FAILED").fail()
      .end().build();
  }

  @Bean
  public Step combineReleaseJobNormalStepManager() throws Exception {
    return stepBuilderFactory.get("combineReleaseJobNormalStep.Manager")
      .partitioner("combineReleaseJobNormalStep",partitioner())
      .partitionHandler(partitionHandler())
      .build();
  }


  @Bean
  public PartitionHandler partitionHandler() throws Exception {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();

    partitionHandler.setStepName("combineReleaseJobNormalStep");
    partitionHandler.setGridSize(GRID_SIZE);
    partitionHandler.setMessagingOperations(messageTemplate());
    //partitionHandler.setPollInterval(5000l);
    partitionHandler.setJobExplorer(this.jobExplorer);

    partitionHandler.afterPropertiesSet();

    return partitionHandler;
  }

  @Bean
  public MessagingTemplate messageTemplate() {
    MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());

    messagingTemplate.setReceiveTimeout(60000000l);

    return messagingTemplate;
  }

  /*
   * Configure outbound flow (requests going to slaves)
   */
  @Bean
  public MessageChannel outboundRequests() {
    return new DirectChannel();
  }

  @Bean
  public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
    return IntegrationFlows
      .from(outboundRequests())
      .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(this.requestQueue))
      .get();
  }

person Bassel Haddad    schedule 24.02.2020    source источник
comment
We are starting the slave on the second JVM by starting the Spring boot app passing a dummyjob name: зачем нужно запускать раб с фиктивным именем задания? Это признак того, что вам не нужен этот параметр в рабочей части.   -  person Mahmoud Ben Hassine    schedule 25.02.2020
comment
Вы правы, мы использовали одно и то же пакетное приложение, для которого требовалось имя пакетного задания. Когда мы запускаем непакетное приложение, подчиненное устройство работает нормально. см. мой ответ ниже. Спасибо за вашу помощь!   -  person Bassel Haddad    schedule 27.02.2020
comment
Рад, что вы исправили свою проблему! В этом случае примите ответ: stackoverflow.com/help/someone-answers.   -  person Mahmoud Ben Hassine    schedule 27.02.2020


Ответы (2)


Если я добавлю конфигурацию задания в ведомое устройство и запущу задание, передав правильное имя задания, проблема не возникнет, что заставляет меня думать, что проблема может быть связана с тем, что не запущено настоящее задание Spring Batch,

Вам не нужно запускать все задание Spring Batch на рабочей стороне. Задание обычно запускается на главной стороне, и на рабочей стороне требуются только рабочие шаги. См. Удаленное разбиение на разделы" справочной документации.

Как запустить подчиненные приложения Spring Batch, использующие разбиение?

Рабочие процессы могут быть запущены как обычное приложение Spring (загрузочное), где StepExecutionRequestHandler (обычно настроенный как активатор службы Spring Integration) прослушивает входящие StepExecutionRequest и выполняет рабочий шаг (расположенный с StepLocator).

Вы можете найти полный набор примеров в докладе Высокопроизводительная пакетная обработка который я совместно с Майклом представил на SpringOne 2018. Исходный код примеров можно найти здесь: https://github.com/mminella/scaling-demos/tree/sp1-2018

person Mahmoud Ben Hassine    schedule 25.02.2020

Мы обнаружили, что запускаем ведомое устройство как пакетное задание, поскольку изначально оно было добавлено в то же пакетное приложение, что и ведущее. Имя задания было обязательным параметром в пакетном приложении, и у него было много зависимостей от пакетных инструментов и jar-файлов Spring, поэтому каким-то образом оно пыталось запустить пакетное задание, и, поскольку конфигурация задания не существует, оно потерпит неудачу в середине обработки и закрыть все bean-компоненты и ресурсы, что вызвало описанную выше проблему с подключением к БД. Когда мы запускаем ведомое устройство как демон или обычное загрузочное приложение Spring, оно запускается нормально и выполняет шаг до завершения.

Чтобы избежать большого количества переделок и удалить все пакетные зависимости из приложения, чтобы оно работало как обычное загрузочное приложение Spring, мы использовали этот код, чтобы заставить его работать как демон:

@SpringBootApplication
@IntegrationComponentScan
public class Application implements CommandLineRunner {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
  }

  @Override
  public void run(String... args) throws Exception {
    System.out.println("Joining thread, you can press Ctrl+C to shutdown application");
    Thread.currentThread().join();
  }

}

``

person Bassel Haddad    schedule 27.02.2020