java Multithreading - отправка дросселя в ExecutorService

У меня есть файл данных с тысячей строк. Я читаю их и сохраняю в базе данных. Я хочу, чтобы этот процесс выполнялся в несколько потоков партиями, скажем, по 50 строк. Когда я читаю файл, 10 строк отправляются в ExecutorService.

ExecutorService executor = Executors.newFixedThreadPool(5);`

Я могу сделать ниже в цикле while, пока мои строки не закончатся....

 Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);

Но я не хочу читать весь файл в память, если обработка 10 строк занимает время. Я хочу отправить только 5 до тех пор, пока не вернется один из потоков, а затем я отправлю следующий.

Допустим, потоку требуется 20 секунд, чтобы сохранить 10 записей, я не хочу, чтобы ExecutorService заполняли тысячу строк, так как процесс чтения продолжает читать и отправлять в ExecutorService

Каков наилучший способ добиться этого?


person Giovanny    schedule 28.10.2015    source источник
comment
Возможный дубликат stackoverflow.com/questions/1250643/   -  person Cratylus    schedule 29.10.2015
comment
@Cratylus, это определенно не дубликат вопроса, на который вы ссылались. ОП спрашивает, как ограничить количество отправленных задач, чтобы избежать одновременного чтения огромного файла, а не как узнать, когда все задачи выполнены.   -  person CodeBlind    schedule 29.10.2015


Ответы (1)


Вы можете сделать это с помощью LinkedList<Future<?>>, который хранит фьючерсы до тех пор, пока вы не достигнете заранее определенного размера. Вот некоторый скелетный код, который должен помочь вам в этом:

int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();

//As long as there are rows to save:
while(moreRowsLeft()){
    //dump another callable onto the queue:
    futures.addLast(service.submit(new RowSavingCallable());

    //if the queue is "full", wait for the next one to finish before
    //reading any more of the file:
    while(futures.size() >= 2*threads) futures.removeFirst().get();
}

//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();

//All rows have been saved at this point

Вы можете удивиться, почему я допустил, чтобы число фьючерсов в два раза превышало количество потоков на машине — это позволяет потокам службы исполнителя работать над сохранением базы данных, в то время как основной поток создает дополнительную работу. Это может помочь скрыть любые затраты на ввод-вывод, связанные с предоставлением большего количества вызываемых объектов для обработки, пока рабочие потоки заняты записью в базу данных.

person CodeBlind    schedule 28.10.2015
comment
@CodeBlind- Спасибо! У меня есть вопрос. Разве мы не должны начать удаление с первого элемента LinkList. Первый, который был добавлен в LinkedList, будет иметь более высокий шанс быть возвращенным первым после выполнения задания? Можем ли мы подключить ExecutorService.take() к вашему алгоритму и оптимизировать его? - person Giovanny; 29.10.2015
comment
@ Джованни, да, ты прав, опечатка с моей стороны :) Я исправил. Что касается подключения ExecutorCompletionService.take() - мне кажется разумным. Вы можете просто использовать счетчик, чтобы отслеживать, сколько вы отправили, и вызывать take(), когда вы превысили некоторый порог. - person CodeBlind; 29.10.2015
comment
Последний цикл вызовет «get» для фьючерсов, для которых мы уже вызывали «get». Это не будет выполнять какой-либо дополнительный процесс, верно? - person Giovanny; 13.11.2015