Как определить, когда все задачи в конвейере выполнены в одной службе Java Executor

У меня есть конвейер задач, которые нужно выполнить с файлами, каждый отдельный тип задачи выполняется внутри другой службы-исполнителя. После инициализации каждой службы-исполнителя я запускаю первую задачу, она гарантированно не завершится, пока не закончит обработку всех файлов, поскольку она обрабатывает папку, либо больше не требуется никакой работы, либо она отправляет вызываемую задачу в service2. Итак, когда вызов shutdown () для первой задачи завершился успешно, все файлы теперь будут обрабатываться в задаче 2 или другой задаче, находящейся ниже по конвейеру, и так далее. Когда мы сможем завершить работу последней службы, мы закончим.

Loader loader = Loader.getInstanceOf();
List<ExecutorService> services = new ArrayList<ExecutorService>();
ExecutorService es = Executors.newSingleThreadExecutor();

//Init Services
services.add(es);
services.add(task1.getService());
services.add(task2.getService());
services.add(task3.getService());
services.add(task4.getService());

//Start Loading Files
es.submit(loader);

int count = 0;
for (ExecutorService service : services)
{
    service.shutdown();
    count++;
    //Now wait for all submitted tasks to complete, for upto one day per task
    service.awaitTermination(10, TimeUnit.DAYS);
    MainWindow.logger.severe("Shutdown Task:" + count);
}

public class AnalyserService
{
    protected String threadGroup;
    public AnalyserService(String threadGroup)
    {
        this.threadGroup=threadGroup;
    }

    protected  ExecutorService      executorService;
    protected  CompletionService    completionService;

    protected void initExecutorService()
    {
        int workerSize = Runtime.getRuntime().availableProcessors();
        executorService
                = Executors.newFixedThreadPool(workerSize, new SongKongThreadFactory(threadGroup));
    }

    public ExecutorService getService()
    {
        if (executorService == null || executorService.isShutdown())
        {
            initExecutorService();
        }
        return executorService;
    }
}

Так что все работает нормально За исключением, у меня неправильная логика загрузки процессора. Каждая служба использует пул, равный количеству процессоров, имеющихся на компьютере. Итак, если у компьютера 4 процессора и у нас 5 служб, тогда у нас может быть 20 потоков, которые пытаются работать одновременно, перегружая процессор. Я думаю, что в этом случае у меня должно быть только 4 потока одновременно.

Если я ограничил каждую службу одним потоком, тогда у Id одновременно будет запущено только 5 потоков, но это все равно неправильно, потому что

  1. Больше не будет прав, если будет больше сервисов или больше процессоров
  2. Это неэффективно, так как основная часть работы будет выполняться с помощью задачи 1, если я ограничу ее одним процессором, она будет медленнее, чем необходимо, позже большинство потоков будет выполняться более поздними задачами, а задача 1 не будет иметь ничего сделать.

Я думаю, что мне нужно, чтобы все задачи разделяли одну службу исполнителя и устанавливали ее размер пула, равный количеству cput, которое имеет компьютер. Но тогда как мне узнать, когда услуга завершена?

Я использую Java 7, поэтому есть ли что-нибудь новое в Java 7, что может помочь, в настоящее время просто использую функции параллелизма Java 5


person Paul Taylor    schedule 17.10.2012    source источник
comment
Вы уверены, что ваша теория использования ЦП верна? AFAIK, java не поддерживает это напрямую. JVM в зависимости от количества собственных потоков решает использовать более одного процессора, даже если они доступны.   -  person NiranjanBhat    schedule 17.10.2012
comment
Извините, я не понимаю ваш вопрос. Насколько я понимаю, если мои службы-исполнители настроены так, что общий пул потоков составляет двадцать, и у нас есть только 4 процессора, которые будут работать хуже, чем если бы у нас был общий пул потоков, настроенный на четыре потока?   -  person Paul Taylor    schedule 17.10.2012
comment
Так не работает :) Обратитесь к этому красивому QA по переполнению стека: stackoverflow.com/questions/1223072/. Вы также можете использовать любой профилировщик java, чтобы проверить это.   -  person NiranjanBhat    schedule 17.10.2012
comment
Хорошо, это интересно, и я буду работать над этим. Но если мы упростим мой пример до ситуации с одним процессором, не правда ли, что если у нас есть пул потоков, настроенный на использование двадцати потоков, он будет работать медленнее, чем если бы он был настроен на меньшее число (например, 4), потому что компьютеры ЦП будет вынужден постоянно сокращать время своего ЦП между этими 20 потоками, а не 4 потоками (при условии, что у нас достаточно работы, чтобы все 20 пулов потоков были заняты)   -  person Paul Taylor    schedule 17.10.2012
comment
Не обязательно ... Это зависит от того, какую задачу выполняет ваш поток. В этом случае ваш поток выполняет задачу ввода-вывода и, следовательно, будет медленнее. Таким образом, чем больше потоков в вашей системе, тем больше пользы от этого будет для вашего приложения. Если, скажем, ваши потоки выполняли только операции, связанные с процессором, такие как некоторые вычисления, в этом случае наличие большего количества потоков не принесет вам большой пользы.   -  person NiranjanBhat    schedule 17.10.2012
comment
Некоторые задачи в значительной степени основаны на процессоре, я не знаю, почему вы думаете, что все они основаны на вводе-выводе. Но мой вопрос заключался не в том, ускорит ли процесс настройки дополнительных пулов потоков, а в том, замедлит ли он их.   -  person Paul Taylor    schedule 17.10.2012
comment
Вам придется в основном настраивать систему. Очевидно, наступит переломный момент, при котором увеличивающееся количество потоков, превышение которого не улучшит скорость программного обеспечения или, что еще хуже, может снизить скорость программного обеспечения. Это то, что делается при тестировании программного обеспечения, когда ваше программное обеспечение ориентировано на ЦП и с определенной конфигурацией программного обеспечения, такой как размер кеша, размер пула потоков и т. Д.   -  person NiranjanBhat    schedule 17.10.2012
comment
Это подводит меня к исходному вопросу, мне кажется, что если все в одной службе исполнителя нет преимущества в настройке большего пула потоков, тогда нет доступных ЦП, поэтому я могу это сделать, но меня останавливает то, что у меня все еще нет возможности обнаружение, когда закончено.   -  person Paul Taylor    schedule 17.10.2012
comment
Кажется, я прав stackoverflow.com/questions/12951112/   -  person Paul Taylor    schedule 18.10.2012


Ответы (1)


Суть вашей проблемы: «[...] перегрузка процессора». Если это проблема, просто правильно запланируйте приоритет вашего приложения. Кстати, у вас больше шансов увеличить нагрузку ввода-вывода, чем увеличить нагрузку на процессор; много разных потоков на самом деле хорошо :-)

Однако ваш вопрос: «Но тогда как я собираюсь определить, когда служба завершена?» Очень простой ответ: submit() вместо invokeAll() и проверьте метод isDone() полученного объекта Future.

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorService.html#submit(java.util.concurrent.Callable)

person parasietje    schedule 17.10.2012
comment
Ваше право. Я думаю, что загрузка ввода-вывода - это проблема, поскольку постоянное переключение задач позволяет запускать все, что находится в очереди, я просто предлагаю одну службу исполнителя как способ правильно планировать вещи. Я расставляю приоритеты просто так, чтобы у нас был конвейер задач, и я хочу воспользоваться преимуществами всех процессоров, чтобы обрабатывать их как можно быстрее, не заставляя машину останавливаться и не заставляя процессор, используемый моей программой, сильно разниться. Я уже использую submit, но проблема в том, что task1 отправляет задачи в task2 (и task3), task2 отправляет задачи в задачу 3 и так далее. Но все это скрыто от основного управляющего класса. - person Paul Taylor; 17.10.2012