Фиксированный пул потоков ThreadPoolExecutor с настраиваемым поведением

я новичок в этой теме... я использую ThreadPoolExecutor, созданный с помощью Executors.newFixedThreadPool(10), и после заполнения пула я начинаю получать RejectedExecutionException. Есть ли способ «заставить» исполнителя перевести новую задачу в состояние «ожидания» вместо того, чтобы отклонять ее и запускать ее при освобождении пула?

Спасибо

Проблема с этим https://github.com/evilsocket/dsploit/issues/159

Задействована строка кода https://github.com/evilsocket/dsploit/blob/master/src/it/evilsocket/dsploit/net/NetworkDiscovery.java#L150


person Simone Margaritelli    schedule 08.11.2012    source источник
comment
Удалось ли вам найти решение этой проблемы?   -  person g00glechr0me    schedule 16.03.2017


Ответы (4)


Если вы используете Executors.newFixedThreadPool(10);, он ставит задачи в очередь и ждет, пока поток не будет готов.

Этот метод

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Как видите, используемая очередь не ограничена (что само по себе может быть проблемой), но это означает, что очередь никогда не будет заполнена, и вы никогда не получите отказ.

Кстати: если у вас есть задачи, связанные с процессором, оптимальное количество потоков может быть

int processors = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(processors);

Тестовый класс, который может проиллюстрировать ситуацию

public static void main(String... args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 1000 * 1000; i++)
        es.submit(new SleepOneSecond());

    System.out.println("Queue length " + ((ThreadPoolExecutor) es).getQueue().size());
    es.shutdown();
    System.out.println("After shutdown");
    try {
        es.submit(new SleepOneSecond());
    } catch (Exception e) {
        e.printStackTrace(System.out);
    }
}

static class SleepOneSecond implements Callable<Void> {
    @Override
    public Void call() throws Exception {
        Thread.sleep(1000);
        return null;
    }
}

отпечатки

Queue length 999998
After shutdown
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@e026161 rejected from java.util.concurrent.ThreadPoolExecutor@3e472e76[Shutting down, pool size = 2, active threads = 2, queued tasks = 999998, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2013)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
    at Main.main(Main.java:17)
person Peter Lawrey    schedule 08.11.2012
comment
тем не менее, я получаю исключение RejectedExecutionException, как вы можете видеть здесь github.com/evilsocket/dsploit/issues /159 - person Simone Margaritelli; 08.11.2012
comment
@SimoneMargaritelli единственная причина, по которой вы должны получить RejectedExecutionException при использовании newFixedThreadPool, заключается в том, что вы пытаетесь отправить задачи исполнителю после вызова его метода shutdown. - person Ian Roberts; 08.11.2012
comment
Учитывая, что в пуле выполнялась только 31 задача, я подозреваю, что он не заполнен. Кроме того, corePoolSize равен 0, что означает, что он был отключен, что вызовет отклонение. - person Peter Lawrey; 08.11.2012
comment
@SimoneMargaritelli Кстати, вам не нужно приводить к ThreadPoolExecutor, я бы оставил его как интерфейс ExecutorService. - person Peter Lawrey; 08.11.2012

Очень возможно, что поток вызывает exit, что устанавливает mStopped в false и останавливает исполнителя, но:

  • ваш запущенный поток может находиться в середине цикла while (!mStopped) и пытается отправить задачу исполнителю, который был остановлен exit
  • условие в while возвращает true, потому что изменение, внесенное в mStopped, не видно (вы не используете какую-либо форму синхронизации вокруг этого флага).

Я бы предложил:

  • сделать mStopped изменчивым
  • обрабатывать случай, когда исполнитель выключается, пока вы находитесь в середине цикла (например, перехватив RejectedExecutionException или, возможно, лучше: выключите исполнителя после цикла while вместо его закрытия в методе выхода).
person assylias    schedule 08.11.2012

Основываясь на более ранних предложениях, вы можете использовать блокирующую очередь для создания фиксированного размера ThreadPoolExecutor. Если вы затем предоставите свой собственный RejectedExecutionHandler, который добавляет задачи в очередь блокировки, он будет вести себя, как описано.

Вот пример того, как вы можете создать такой исполнитель:

int corePoolSize    = 10;
int maximumPoolSize = 10;
int keepAliveTime   =  0;
int maxWaitingTasks = 10;

ThreadPoolExecutor blockingThreadPoolExecutor = new ThreadPoolExecutor(
        corePoolSize, maximumPoolSize,
        keepAliveTime, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(maxWaitingTasks),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted while submitting task", e);
                }
            }
        });
person andreer    schedule 25.09.2013

Если я правильно понимаю, у вас есть ThreadPool, созданный с фиксированным количеством потоков, но у вас может быть больше задач для отправки в пул потоков. Я бы рассчитал keepAliveTime на основе запроса и установил его динамически. Таким образом, у вас не будет RejectedExecutionException.

Например

long keepAliveTime = ((applications.size() * 60) / FIXED_NUM_OF_THREADS) * 1000; threadPoolExecutor.setKeepAliveTime (keepAliveTime, TimeUnit.MILLISECONDS);

где приложение — это набор задач, которые каждый раз могут быть разными.

Это должно решить вашу проблему, если вы знаете среднее время выполнения задачи.

person Paresh Dhabalia    schedule 12.04.2016