Я пытаюсь безопасно передать параметры от тасклета к шагу в той же работе.
Моя работа состоит из 3 тасклетов (шаг 1, шаг 2, шаг 3) один за другим и, в конце концов, шаг 4 (процессор, считыватель, писатель)
эта работа выполняется много раз параллельно.
На шаге 1 внутри тасклета я оцениваю param (hashId) через веб-службу), чем я передаю его по всей своей цепочке до моего читателя (который на шаге 4)
На шаге 3 я создаю новый параметр с именем: filePath, основанный на hashid, и отправляю его на шаг 4 (читатель) в качестве местоположения файлового ресурса.
Я использую stepExecution для передачи этого параметра (hashId и filePath).
Я попробовал 3 способа сделать это с помощью тасклета:
чтобы передать параметр (hashId с шага 1 на шаг 2 и с шага 2 на шаг 3), я делаю следующее:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("hashId", hashId);
На шаге 4 я заполняю filePath на основе hashId и передаю его таким образом моему последнему шагу (который является процессором чтения и писателем)
public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..
@Override
public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext
executionContext) throws IOException {
String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");
...
filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
}
logger.info("filePath + "for hashId=" + hashId);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
Обратите внимание, что я печатаю значения hashId и filePath прямо перед тем, как закончить этот шаг (шаг 3). по журналам они согласованы и заполнены, как ожидалось
Я также добавил журналы в свой ридер, чтобы увидеть в журнале полученные параметры.
@Bean
@StepScope
public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
logger.info("test filePath="+filePath+");
return itemReader;
}
Когда я выполняю это задание ~ 10 раз, я вижу, что значение param filePath заполняется другими значениями filePath заданий при параллельном выполнении.
Вот как я продвигаю ключи задания с помощью executionContextPromotionListener:
определение должности:
@Bean
public Job processFileJob() throws Exception {
return this.jobs.get("processFileJob").
start.(step1).
next(step2)
next(downloadFileTaskletStep()). //step3
next(processSnidFileStep()).build(); //step4
}
шаг 3 определение
public Step downloadFileTaskletStep() {
return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
}
@Bean
public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"filePath"});
return executionContextPromotionListener;
}
Те же потоки результатов испортили параметры
Я могу отслеживать результаты с помощью таблицы базы данных Spring Batch: batch_job_execution_context.short_context:
здесь вы можете увидеть, что filePatch, созданный с помощью hashid, не идентичен исходному hashId // неправильная запись ///
{"map": [{"entry": [{"string": "totalRecords", "int": 5}, {"string": "segmentId", "long": 13}, {"string": [ filePath, / etc / mydir / services / notification_processor / files / 2015_04_22 / f1c7b0f2180b7e266d36f87fcf6fb7aa .csv "]}, {" string ": [" hashId "," 20df39d201fffc744f4423c > "]}]}]}
Теперь, если мы проверим другие записи, они кажутся хорошими. но всегда один или два напортачили
// правильные записи
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}
Есть идеи, почему у меня проблемы с потоками?
@Bean @Scope("prototype")
- person Palcente   schedule 21.04.2015filePath
в качествеjobParameter
s? таким образом, параметр filePath должен быть безопасно распределен по шагам - person Luca Basso Ricci   schedule 22.04.2015