Отмена запроса производителя-потребителя

public class MainClass {

private static final int size = 5;

private ExecutorService prodExec = Executors.newFixedThreadPool(size);
private ExecutorService consExec = Executors.newFixedThreadPool(size);

//main method here

public void start(String[] args) {

    for (int index = 0; index < size; index++) {
        Runnable producer = new Producer(consExec, listOfIds);
        prodExec.execute(producer);
    }

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            prodExec.shutdown();
            try {
                prodExec.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
            }

            consExec.shutdown();
            try {
                consExec.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
            }
        }
    });
    }
}
public class Producer implements Runnable {

private ExecutorService consExec;
private List<Long> list;

public Producer(ExecutorService exec, List<Long> list) {
    this.consExec = exec;
    this.list = list;
}

public void run() {
    for (Long id: list) {
        data = get data from db for the id
        consExec.execute(new Consumer(data));
    }
}
}
public class Consumer implements Runnable {

public void run() {
    // call web service
}
}

Я хотел бы обработать сценарий, когда пользователь запрашивает отключение, нажав Ctrl+C. Я думаю, что это можно сделать в хуке выключения. Однако, как и в приведенном выше коде, каждый производитель получает список идентификаторов (может быть, 250?) для обработки, т. е. вызывает базу данных для получения данных для идентификатора и отправки данных в поток-потребитель, который затем обращается к сети. услуга.

Как выйти из цикла for в каждом потоке Producer, если было запрошено отключение, чтобы каждый поток не обрабатывал идентификаторы, которые еще не были обработаны? Мне удалось заставить ShutDownHook работать, но я не уверен, как каждый поток должен включать логику в методе run для выхода из метода run() в случае запроса на завершение работы. Может быть, установив логическую переменную (AtomicBoolean) извне, чтобы каждый поток проверял цикл for перед обработкой каждого идентификатора?

Насколько я понимаю, если я вызываю shutdown(), он выполняет все отправленные задачи, а затем завершает работу. В этом случае невозможно остановить обработку, так как задачи уже поставлены в очередь на сервис-исполнитель.

Если я вызову shutdownNow() вместо shutdown(), это может привести к неожиданным результатам?


person Oxford    schedule 16.08.2011    source источник


Ответы (2)


shutdownNow vs shutdown зависит от того, хотите ли вы, чтобы текущие задачи выполнялись в первую очередь.

Если вы хотите, чтобы они немедленно остановились, вы должны сделать две вещи. Сначала вызовите shutdownNow. Во-вторых, в методе run вы проверяете состояние прерывания потока.

public class Producer implements Runnable {

private ExecutorService consExec;
private List<Long> list;

public Producer(ExecutorService exec, List<Long> list) {
    this.consExec = exec;
    this.list = list;
}

    public void run() {
        for (Long id: list) {
            if(Thread.currentThread().isInterrupted()){
               //the shutdownNow method has been called (or may a future.cancel(true))
            }
            data = get data from db for the id
            consExec.execute(new Consumer(data));
        }
    }
}

Здесь вы можете видеть, что метод run теперь знает, что текущий поток был прерван. Затем этот метод запуска может очистить любые данные и выйти

person John Vint    schedule 16.08.2011
comment
Я попытался добавить оператор sysout(interrupted) в блок if и попытался выполнить shutdownNow(), как вы предложили, но для некоторых потоков операторы не печатались. Например, я породил 1000 потоков, присвоил каждому уникальное имя, а затем вызвал shutdownNow() в ShutDownHook(), нажал Ctrl+C. Прерванный оператор был напечатан только для нескольких потоков, т. е. общее количество потоков, вышедших из run(), и количество прерванных потоков не суммируются с общим количеством потоков, вошедших в run. Могу ли я с уверенностью предположить, что прерывание() будет вызываться для потоков в shutdownNow()? - person Oxford; 17.08.2011
comment
Вероятно, это связано с тем, что из 1000 созданных вами потоков лишь немногие из них выполняли какую-либо работу. Это правильно? Если это так, остальные ожидают в BlockingQueue. Прерывание в этот момент будет обрабатываться ExecutorService. Короткий ответ заключается в том, что разница заключается в том, что другие потоки M (M равно 1000 - N фактически выполняемых задач) не были в методе public void run() - person John Vint; 17.08.2011
comment
И когда я говорю об обработке ExecutorService, я имею в виду, что ExecutorService должным образом закроет остальные потоки. - person John Vint; 17.08.2011

Если вы используете Java SE 6+, у вас есть доступ к классам JMX. . Мы сочли полезным использовать это для «закрытия» запущенных служб.

По сути, вы регистрируете свою службу как службу JMX. И используйте класс драйвера, чтобы запустить его.

В вашем реальном классе обслуживания реализуйте свою логику, которая, по сути, использует бесконечный цикл для проверки условия, чтобы убедиться, что оно истинно (что будет по умолчанию).

Создайте еще один класс драйвера, который подключается к вашей службе JMX и изменяет значение условия цикла на false. Затем, когда вы нажмете условие цикла, оно (изящно) закроется и не будет обрабатывать следующий набор значений.

person Chris Aldrich    schedule 16.08.2011