Очередь потоков Java

У нас есть приложение, которое обрабатывает элементы и на каждой итерации запускает поток для обновления другой базы данных — не так уж важно, что происходит в этом другом потоке, это очень простое обновление.

Наше первоначальное намерение состояло в том, чтобы (используя поток) убедиться, что основная обработка не задерживается, инициализируя соединение с этой другой базой данных и запуская обновление.

Вчера у нас была проблема, из-за которой (по пока неизвестной причине) база данных замедлилась, и количество параллельных потоков взлетело до небес, в результате чего в этой БД было более 1000 подключений. Итак, мы поняли, что нам нужно больше контроля над потоками.

Мне нужна библиотека или инструмент для нашего программного обеспечения, который может:

1) Поместите потоки/задания/задачи (что угодно — мы можем переписать код, если это необходимо, у нас есть объекты Thread) в очередь, такую ​​​​как система 2) мы можем определить, сколько потоков выполняется максимально одновременно 3) После завершения потока поток удаляется из очереди, чтобы сборщик мусора мог удалить все задействованные объекты.

Я много читал и нашел ExecutorService (Executors.newFixedThreadPool(5);), но может проблема в том, что он не работает с 3), потому что согласно javadocs:

Потоки в пуле будут существовать до тех пор, пока он не будет явно отключен.

Что, я полагаю, означает, что если приложение продолжает добавлять потоки, потоки будут существовать до тех пор, пока приложение не будет перезапущено (или если я выключу и повторно создам ExecutorService, но мне это кажется хаком).

Правильно ли я считаю, что Executors.newFixedThreadPool(5) не соответствует моему 3) требованию? Действительно ли я получаю проблему с хорошего конца? Мне нужны потоки или что-то другое?


person Zsolt János    schedule 04.08.2014    source источник
comment
Я думаю, что мог бы легко написать инструмент для своих нужд, я просто чувствую, что это был бы не лучший ход, должно быть что-то, что соответствует нашим потребностям на 100%.   -  person Zsolt János    schedule 04.08.2014
comment
Вы опасно близки к тому, чтобы запросить рекомендации сторонней библиотеки, которая не по теме. Возможно, вы захотите перефразировать свой вопрос до начала закрытого голосования.   -  person Tim B    schedule 04.08.2014
comment
Здесь вам нужно понять разницу между Thread и Runnable/Callable Task.   -  person Kuldeep Jain    schedule 04.08.2014
comment
Я отредактировал свой вопрос, спасибо.   -  person Zsolt János    schedule 04.08.2014
comment
Я управляю проектом с открытым исходным кодом, который управляет пулами/очередями потоков, который может быть ответом, который вы ищете. Естественно, без подробностей я не могу быть уверен, но вы можете взглянуть на эту статью, которую я написал, и посмотреть, будет ли она вам полезна: coopsoft.com/ar/j2searticle.html   -  person edharned    schedule 04.08.2014


Ответы (5)


Здесь вам нужно понять разницу между Thread и Runnable/Callable Task. поэтому значение The threads in the pool will exist until it is explicitly shutdown. заключается в том, что в любой момент времени в пуле потоков будет 5 потоков, если вы используете Executors.newFixedThreadPool(5); . И работа, которую вы хотите, чтобы эти потоки выполняли, будет представлена ​​как Tasks (Runnable/Callable). Таким образом, по существу, в любой момент времени через этот пул потоков будет выполняться 5 потоков, что в вашем случае будет 5 соединениями.

person Kuldeep Jain    schedule 04.08.2014
comment
Спасибо, это было. Я создал новую реализацию Runnable и продолжаю добавлять экземпляры этого класса в ExecutorService. - person Zsolt János; 04.08.2014

Предложение, которого вы боитесь:

The threads in the pool will exist until it is explicitly shutdown

описывает только вызовы Executors.newFixedThreadPool(). Чтобы лучше контролировать поведение пула потоков, используйте конструктор ThreadPoolExecutor явно, например

new ThreadPoolExecutor(1, //minimal Pool Size, 10, // maximal Pool Size, 30, TimeUnit.SECONDS // idle thread dies in 30 seconds new ArrayBlockingQueue<Runnable>())

person Alexei Kaigorodov    schedule 05.08.2014

Потоки останутся там (ожидая выполнения других задач), но не будут хранить все данные, которые вы туда поместили. Когда поток в пуле потоков выполнит задачу, он возьмет на себя следующую задачу и больше не будет ссылаться на существующую задачу.

Так что ваши опасения беспочвенны, если вы явно не держите ссылки на задачи.

person Kayaman    schedule 04.08.2014

Используйте ScheduledExecutorService с фиксированным пулом потоков для любого необходимого количества подключений.

Имейте BlockingQueue, в который вы помещаете запросы, рабочие потоки ждут в очереди и обрабатывают запросы по мере их появления.

person Tim B    schedule 04.08.2014

ExecutorService — это то, что вам нужно . Он предоставляет вам интерфейс (Future) в состояние основного потока, способность обнаруживать исключения и возвращать значение из завершенного потока.

Вот простой пример использования ExecutorSerivce и интерфейса Future.

public class Updater implements Future< boolean > {

    public Updater() { }

    public boolean call() throws Exception {
        System.out.println( "Hello, World!" );
        return true;
    }
}

public class Main {
    public Main() { }

    public static void main( String[] args ) {
        ExecutorService pool = Executors.newFixedThreadPool( 1 );


        boolean again = true;
        do {
            if ( again ) { 
                Future< ? > update = pool.submit( new Updater() );
            }

            /* Do other work while waiting for update to finish */

            if( update.isDone() ) { //may be because of completion or an exception
                try {
                    again = update.get(); // This would block if the Updater was still running
                } catch( ExecutionException ee ) { // This is thrown by get() if an exception occurred in Updater.call()
                    again = false;
                    ee.printStackTrace();
                }
            }
        } while ( true );
    }
}

Приведенный выше пример запустит обновление вашей базы данных, если последнее обновление прошло успешно без каких-либо исключений. Таким образом вы контролируете количество потоков, пытающихся подключиться, и отлавливаете любые ошибки, вызывающие сбой обновления.

person walsht    schedule 04.08.2014
comment
Разве это не будущее, в котором вы продолжаете упоминать интерфейс Callable? - person Zsolt János; 04.08.2014
comment
Future — это отдельный интерфейс для взаимодействия с Callable объектами. - person walsht; 06.08.2014
comment
Я имел в виду, что я должен реализовывать Callable, а не Future, я считаю. - person Zsolt János; 06.08.2014
comment
Да, вместо реализации Runnable вы реализуете Callable‹T›. Вам нужно будет переименовать свой метод run() в call() и заставить его возвращать T, чтобы он работал. - person walsht; 07.08.2014