Quasar Fiber эквивалент Java ThreadPoolExecutor?

Мне было любопытно узнать о Quasar и его легких волокнах в качестве замены потоков. Изучив их документацию по API, я не смог понять, как перейти к преобразованию типичного ThreadPoolExecutor в пул волокон.

int maxThreadPoolSize = 10;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        maxThreadPoolSize,
        maxThreadPoolSize,
        10, TimeUnit.MINUTES,
        new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 100; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // run some code
        }
    });
}

Приведенный выше код создает пул с 10 потоками, очередь перед пулом, которая может содержать 10 элементов, и политику отклонения (когда очередь заполнена), чтобы основной поток выполнял сам задачу Runnable. Поскольку цикл for создает 100 исполняемых объектов, они будут выполняться по 10 за раз в пуле, 10 ставятся в очередь, а основной поток сам выбирает исполняемый объект до тех пор, пока другие не будут завершены, после чего основной поток возвращается к добавлению исполняемых модулей в исполняющую программу.

Как бы вы сделали это с помощью Quasar's Fibers? Он предназначен для использования в качестве такового в первую очередь?


РЕДАКТИРОВАТЬ: мой первоначальный вопрос был плохо сформулирован. По сути, я пытался найти механизм ограничения количества одновременно работающих волокон. Например, не запускайте больше волокон, если уже запущено 200 волокон. Если запущено максимальное количество волокон, дождитесь окончания работы одного из них, прежде чем запускать новый.


person Vlad Poskatcheev    schedule 08.10.2016    source источник


Ответы (4)


Файберы очень дешевы, поэтому вам вообще не нужен пул (и его асинхронная модель диспетчеризации заданий): просто запустите файбер и дайте ему выполнять обычный последовательный код каждый раз, когда вам нужно запустить новый последовательный процесс одновременно с другими.

person circlespainter    schedule 09.10.2016
comment
Я развлекаюсь с пулом Fibre, потому что с их введением узкое место моего приложения может сместиться с # потоков на память/процессор. Например, если пул из 100 потоков обрабатывает события из очереди, то верхняя граница устанавливается для памяти/процессора. Вы никогда не можете обрабатывать более 100 событий одновременно. При переключении на волокна я хотел бы иметь возможность применять аналогичную верхнюю границу. Какой-то механизм для ограничения скорости сервера и никогда не запускает более 2000 волокон. - person Vlad Poskatcheev; 11.10.2016
comment
Я понял, что моя первоначальная терминология плоха и вводит в заблуждение. Пул волокон не имеет смысла (так как их так дешево запускать). Мне нужен существующий (чтобы не изобретать колесо) механизм ограничения скорости моего серверного приложения, чтобы он никогда не запускал одновременно более X волокон. - person Vlad Poskatcheev; 11.10.2016

Каждое волокно, запланированное FiberScheduler, когда вы создаете волокно без планировщика, будет создано FiberForkJoinScheduler и назначено этому волокну.

Короче говоря, если вы хотите управлять своими волокнами в пуле потоков, используйте FiberExecutorScheduler: Документ Quasar о планировании волокон
Ваш код может быть таким

    int maxThreadPoolSize = 10;
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            maxThreadPoolSize,
            maxThreadPoolSize,
            10, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    FiberExecutorScheduler scheduler = new FiberExecutorScheduler("FibersInAPool", executor);
    for (int i = 0; i < 100; i++) {
        Fiber fiber = new Fiber<Void>(scheduler
                , new SuspendableCallable<Void>() {
            @Override
            public Void run() throws SuspendExecution, InterruptedException {
                // run some code
                return null;
            }
        });
        fiber.start();
    }
person tiboo    schedule 30.11.2018
comment
Возможно, я что-то упускаю, но это, похоже, не решает проблему. А именно, как реализовать механизм ограничения количества одновременно работающих волокон. В приведенном выше коде используется ThreadPoolExecutor, что кажется озадачивающим, потому что именно его пытаются заменить и избежать. Идея состоит в том, чтобы максимально отказаться от использования потоков и вместо этого использовать волокна. - person Vlad Poskatcheev; 01.12.2018
comment
afaik, у quasar есть монитор в планировщике (у него есть защищенный модификатор), вы пытались расширить планировщики для доступа к их мониторам? github. com/puniverse/quasar/blob/master/quasar-core/src/main/ - person tiboo; 03.12.2018

java.util.concurrent.Semaphore хорошо работал в моей конкретной настройке.

Общий смысл моего решения:

  • создать семафор с желаемым максимальным количеством разрешений (он же максимальное количество одновременных волокон)
  • основной поток отвечает за сбор задач для обработки из очереди
  • main thread calls semaphore.acquire():
    • if a permit is available, then launch new Fiber to process task
    • если все разрешения приняты, то семафор заблокирует основной поток и будет ждать, пока разрешение не станет доступным
  • после запуска Fiber основной поток повторяет свою логику. Берет новую задачу из очереди и пытается запустить новый Fiber.

Бонус: стандартный семафор Java исправлен, и количество разрешений не может быть изменено динамически. Чтобы сделать его динамичным, пригодилась эта ссылка: http://blog.teamlazerbeez.com/2009/04/20/javas-semaphore-resizing/

person Vlad Poskatcheev    schedule 08.12.2016

мы только что сделали предварительный выпуск kilim 2.0. он обеспечивает реализацию волокна и актора (аналогично quasar) и поддерживается ThreadPoolExecutor.

наиболее эффективным способом ограничения количества одновременных задач было бы использование одной задачи в качестве контроллера и прослушивание почтового ящика (я думаю, квазар вызывает эти каналы) и ведение подсчета запущенных задач. когда каждая задача завершится, отправьте сообщение в почтовый ящик

как правило, нет смысла использовать больше потоков, чем ядер.

person nqzero    schedule 09.12.2016