ExecutorService invokeall не прерывается

У меня есть следующий код:

public class Application1 {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Callable<Boolean>> callableTasks = new ArrayList<>();
        callableTasks.add(new LogDownloadCallable());
        callableTasks.add(new LogDownloadCallable());
        callableTasks.add(new LogDownloadCallable());
        callableTasks.add(new LogDownloadCallable());
        callableTasks.add(new LogDownloadCallable());

        List<Future<Boolean>> futures =null;
        try {
          futures = executorService.invokeAll(callableTasks, 1090, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
          System.out.println("invokeall inturrupted !!");
        }
        executorService.shutdownNow();
    }
}


class LogDownloadCallable implements Callable<Boolean> {
    public Boolean call() throws Exception {
    try{
      //This for sure takes days to complete, so should through Cancellation exception because    timeout on invokeall set to 1 minute
      long val = 0;
      for (long i = 0; i < Long.MAX_VALUE - 5000; i++) {
        val += i;
      }
      System.out.println("complete");
    }catch(Exception e){
      System.out.println("Exception ! " +e.toString()+ Thread.currentThread().getId());
      return false;
    }
    return true;
  }
}

Я надеялся получить "java.lang.InterruptedException" после тайм-аута 1090 мс. Но этого не происходит. Может кто-нибудь помочь мне понять, почему? Если я помещу Thread.sleep(2000); в блок try в public Boolean call() throws Exception { перед циклом for, то я получу InterruptedException. Это поведение странно. PS: это просто фиктивный пример, который я придумал, чтобы продемонстрировать свою проблему.


person voidMainReturn    schedule 12.02.2018    source источник
comment
Вы получаете java.lang.InterruptedException, когда код блокируется при вводе-выводе/блокировке, спящем режиме и т. д. В других случаях вам нужно проверить, не прерван ли текущий поток, используя Thread.currentThread().isInterrupted()   -  person Venkata Raju    schedule 12.02.2018
comment
Тогда какой смысл иметь тайм-аут в invokeAll ?   -  person voidMainReturn    schedule 12.02.2018


Ответы (3)


Отмена происходит путем отправки прерывания потоку, выполняющему задачу. Однако, как всегда, отмена через прерывание является совместным усилием. Прерванная задача должна периодически проверять флаг прерывания потока, в котором она выполняется, а затем прекращать выполнение при первой же возможности. Распространенным заблуждением является мнение, что прерывание потока автоматически приведет к выдаче InterruptedException в выполняющемся в нем коде.

Если задача вызывает методы блокировки (вы можете распознать их, поскольку объявлено, что они вызывают InterruptedException), она будет вынуждена обрабатывать прерывание потока, обрабатывая исключение. Thread.sleep() является таким методом, поэтому вы видите работу тайм-аута, когда добавляете его в свой код.

Если задача не вызывает какие-либо блокирующие методы, например, когда вы не вызываете Thread.sleep() в своем коде, задача может узнать, что она прервана, только путем проверки самого флага прерывания потока. . Для кода, работающего в бесконечном (или длинном) цикле, это обычно делается один раз за итерацию.

Обратите внимание, что некоторые методы блокируют код, работающий в потоке, но также не реагируют на прерывание (например, блокируют ввод-вывод). Они не будут бросать InterruptedException и будут с радостью продолжать делать то, что они делают. Сделать такие задачи более чувствительными к отмене немного сложнее, и это может быть разным в каждом конкретном случае.

Остановка при первой же возможности может означать множество вещей. Для некоторых задач это может означать немедленную остановку. Для другого кода это может означать очистку флага прерывания, выполнение до завершения, затем повторную установку флага прерывания и возврат результата.

Для вашего примера кода это, очевидно, будет прежний вариант.

public Boolean call() throws Exception {
    long val = 0;
    for (long i = 0; i < Long.MAX_VALUE - 5000; i++) {
        if (Thread.currentThread().isInterrupted()) { // explicit check for cancellation
          System.out.println("Exception ! " +e.toString()+ Thread.currentThread().getId());
          return false;
        }
        val += i;
    }
    System.out.println("complete");
    return true;
}
person bowmore    schedule 13.02.2018

В двух словах нельзя прерывать задачу, которая не кинула java.lang.InterruptedException. Если бы у вас был, например, этот код

class LogDownloadCallable implements Callable<Boolean> {
public Boolean call() throws Exception {
    try {
        long val = 0;
        for (long i = 0; i < Long.MAX_VALUE - 5000; i++) {
            val += i;
            Thread.sleep(1); // throws java.lang.InterruptedException
        }
        System.out.println("complete");
    } catch (Exception e) {
        System.out.println("Exception ! " + e.toString() + Thread.currentThread().getId());
        return false;
    }
    return true;
}

}

ваше приложение будет работать так, как вы хотите. В вашем случае я рекомендую вам использовать Daemon потоков, которые можно немедленно остановить. Как настроить ExecutorService с Daemon потоками смотреть здесь. Более подробная информация о прерывании в Java доступна здесь.

person Lev Khruschev    schedule 12.02.2018

В этой ситуации мне помогло создание потоков с помощью класса ThreadFactoryBuilder в Guava.

ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setDaemon(true).build());

Потоки, которые создаются, являются демонами, и они уничтожаются после тайм-аута. В случае прерванных потоков я получаю CancellationException, когда делаю future.get().

person voidMainReturn    schedule 12.02.2018