Как безопасно передавать параметры из тасклета в шаг при выполнении параллельных заданий

Я пытаюсь безопасно передать параметры от тасклета к шагу в той же работе.

Моя работа состоит из 3 тасклетов (шаг 1, шаг 2, шаг 3) один за другим и, в конце концов, шаг 4 (процессор, считыватель, писатель)

эта работа выполняется много раз параллельно.

На шаге 1 внутри тасклета я оцениваю param (hashId) через веб-службу), чем я передаю его по всей своей цепочке до моего читателя (который на шаге 4)

На шаге 3 я создаю новый параметр с именем: filePath, основанный на hashid, и отправляю его на шаг 4 (читатель) в качестве местоположения файлового ресурса.

Я использую stepExecution для передачи этого параметра (hashId и filePath).

Я попробовал 3 способа сделать это с помощью тасклета:

чтобы передать параметр (hashId с шага 1 на шаг 2 и с шага 2 на шаг 3), я делаю следующее:

chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("hashId", hashId);

На шаге 4 я заполняю filePath на основе hashId и передаю его таким образом моему последнему шагу (который является процессором чтения и писателем)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..

    @Override
     public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext    
     executionContext) throws IOException {

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");

          ...

filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys

        chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("filePath", filePath);
    } 

logger.info("filePath + "for hashId=" + hashId);

}
@Override
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

Обратите внимание, что я печатаю значения hashId и filePath прямо перед тем, как закончить этот шаг (шаг 3). по журналам они согласованы и заполнены, как ожидалось

Я также добавил журналы в свой ридер, чтобы увидеть в журнале полученные параметры.

@Bean
    @StepScope
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
              logger.info("test filePath="+filePath+");

        return itemReader;
    }

Когда я выполняю это задание ~ 10 раз, я вижу, что значение param filePath заполняется другими значениями filePath заданий при параллельном выполнении.

Вот как я продвигаю ключи задания с помощью executionContextPromotionListener:

определение должности:

 @Bean
    public Job processFileJob() throws Exception {
        return this.jobs.get("processFileJob").
                start.(step1).
                next(step2)
                next(downloadFileTaskletStep()). //step3
                next(processSnidFileStep()).build();  //step4

    }

шаг 3 определение

  public Step downloadFileTaskletStep() {
        return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
    }


  @Bean
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
        ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
        executionContextPromotionListener.setKeys(new String[]{"filePath"});
        return executionContextPromotionListener;
    }

Те же потоки результатов испортили параметры

Я могу отслеживать результаты с помощью таблицы базы данных Spring Batch: batch_job_execution_context.short_context:

здесь вы можете увидеть, что filePatch, созданный с помощью hashid, не идентичен исходному hashId // неправильная запись ///

{"map": [{"entry": [{"string": "totalRecords", "int": 5}, {"string": "segmentId", "long": 13}, {"string": [ filePath, / etc / mydir / services / notification_processor / files / 2015_04_22 / f1c7b0f2180b7e266d36f87fcf6fb7aa .csv "]}, {" string ": [" hashId "," 20df39d201fffc744f4423c > "]}]}]}

Теперь, если мы проверим другие записи, они кажутся хорошими. но всегда один или два напортачили

// правильные записи

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}   

Есть идеи, почему у меня проблемы с потоками?


person rayman    schedule 21.04.2015    source источник
comment
Как именно вы одновременно выполняете свою работу? Каждое задание будет иметь отдельный контекст, поэтому параметры будут уникальными для каждого выполнения задания.   -  person Palcente    schedule 21.04.2015
comment
Я просто выполняю одно и то же задание по несколько раз одновременно через jobLauncher. Начальные параметры безопасны. но я создаю новый параметр (filePath) внутри тасклета, и я должен отправить его моему читателю, который на следующем шаге. и это значение несовместимо, когда я выполняю много заданий параллельно. как не потокобезопасный   -  person rayman    schedule 21.04.2015
comment
просто из любопытства попробуй @Bean @Scope("prototype")   -  person Palcente    schedule 21.04.2015
comment
не удается найти символ @Scope (прототип). кстати: используя Spring-boot   -  person rayman    schedule 21.04.2015
comment
Внутри тасклета я на самом деле печатаю правильные параметры, чтобы убедиться. но когда я печатаю внутри шага читателя (при выполнении многих заданий в параллельном режиме), в результатах вы можете видеть, что читатель получил значение filePath дважды, что означает, что параметр испортился с другим потоком   -  person rayman    schedule 21.04.2015
comment
да, потому что это синглтон, а не прототип, и он используется повторно, как насчет добавления слушателя шагов, чтобы установить это значение перед шагом   -  person Palcente    schedule 21.04.2015
comment
Я тоже пробовал это. Я продвинул ключи (filePath), а затем использовал @beforestep. Опять же, я вижу, что параметры передаются, но при работе в параллельном режиме он перепутался с другими потоками (возможно)   -  person rayman    schedule 21.04.2015
comment
Отредактировал свой вопрос с помощью impl   -  person rayman    schedule 21.04.2015
comment
Можете ли вы попробовать обойти проблему, используя filePath в качестве jobParameters? таким образом, параметр filePath должен быть безопасно распределен по шагам   -  person Luca Basso Ricci    schedule 22.04.2015
comment
Это было бы здорово, но как передать параметр из Tasklet в Step с помощью jobParameters? Я пробовал. так и не удалось. не могли бы вы ответить на этот вопрос примером, как это сделать?   -  person rayman    schedule 22.04.2015
comment
как Майкл Минелла, они неизменны, вы не можете установить их во время работы. и я устанавливаю это значение только во время моей цепочки должностей. Я не могу знать это с самого начала, я могу разделить задание на два, а затем установить его во второй части .. но это плохой дизайн   -  person rayman    schedule 22.04.2015


Ответы (1)


Чтобы просмотреть использованные вами методы:

  • Метод 1. Редактирование JobParameters JobParameters неизменяемо в задании, поэтому попытки изменить их во время выполнения задания предприниматься не должны.
  • Метод 2 - Редактирование JobParameters v2 Метод 2 действительно такой же, как метод 1, вы только собираетесь получить ссылку на JobParameters другим способом.
  • Метод 3 - Использование ExecutionContextPromotionListener. Это правильный способ, но вы делаете что-то неправильно. ExecutionContextPromotionListener просматривает шаг ExecutionContext и копирует указанные вами ключи в ExecutionContext задания. Вы добавляете ключи непосредственно в контекст выполнения задания, что является плохой идеей.

Короче говоря, метод 3 наиболее близок к правильному, но вы должны добавить свойства, которыми хотите поделиться, в ExecutionContext шага, а затем настроить ExecutionContextPromotionListener для продвижения соответствующих ключей в ExecutionContext задания.

Код будет обновлен следующим образом:

chunkContext.getStepContext()
            .getStepExecution()
            .getExecutionContext()
            .put("filePath", filePath);
person Michael Minella    schedule 21.04.2015
comment
@ Спасибо, что поделились со мной! только одно уточнение: вы должны добавить свойства, которыми хотите поделиться, в ExecutionContext шага. Не уверен, что я понял, как реализовать ваше предложение. Я настраиваю ExecutionContextPromotionListener с ключами и добавляю его в качестве слушателя в downloadFileTaskletStep. Как бы вы изменили эту реализацию? Спасибо!! - person rayman; 21.04.2015
comment
Поэтому я должен использовать chunkContext, а не stepExecution, который я инициализировал на beforeStep. - person rayman; 21.04.2015
comment
Привет, Майкл, я попробовал. Я выполнил эти задания сразу 14 раз и вижу, что одна запись противоречива. Это означает, что внутри тасклета я печатаю hashid = x, и когда я перехожу к шагу (в рамках того же задания), я печатаю hashid = y (выполнения другого задания). Я могу отследить это через таблицу базы данных: batch_job_execution_context и coulmn short_context - person rayman; 22.04.2015
comment
Привет, Майкл, интересно, смотрели ли вы на мой отредактированный вопрос. Я предоставил более подробную информацию. Эта ошибка действительно останавливает наше производственное развертывание. любые идеи, если это проблема в рамках, или я что-то пропустил? Спасибо. - person rayman; 24.04.2015
comment
У вас есть проект или содержание github, иллюстрирующее проблему, на которую я могу обратить внимание? - person Michael Minella; 24.04.2015
comment
Я добавил вам суть с соответствующими файлами. дайте мне знать, если требуются дополнительные пояснения: gist.github.com/IdanFridman/f35249dd4299518ffb93 - person rayman; 26.04.2015
comment
Я надеялся на исполняемую суть, чтобы я мог протестировать. Однако я заметил одну вещь: на ваших ступенях нет @Bean. Это означает, что для ваших шагов нет определений bean-компонентов. Учитывая, что Spring Batch использует несколько BeanPostProcessors для обработки определенных функций, я бы не рекомендовал этот подход. - person Michael Minella; 27.04.2015