Параллелизм Java: выполнение многих бесконечных задач с несколькими потоками

Я создаю (параллельный) симулятор для набора N частиц, которые движутся в пространстве в соответствии с законами Ньютона. Моя идея состоит в том, чтобы смоделировать каждую частицу как задачу, которая взаимодействует с другими частицами (задачами), чтобы получить их положение и массу, чтобы рассчитать результирующую силу, которой она подвергается. Каждая частица-задача является чем-то вроде

while(true){
   force = thisParticle.calculateNetForce(allTheParticles);
   thisParticle.waitForAllTheParticlesToCalculateNetForce(); // synchronization
   thisParticle.updatePosition(force);
   thisParticle.waitForAllTheParticlesToUpdateTheirState(); // synchronization
}

У меня может быть много частиц (100 и более), поэтому я не могу создать такое количество потоков Java (которые отображаются на физические потоки). Моя идея состоит в том, чтобы использовать Runtime.getRuntime().availableProcessors()+1 потока, в которых можно выполнять множество задач.

Однако я не могу использовать FixedThreadExecutor, потому что задачи частиц не заканчиваются. Я хотел бы использовать FixedThreadExecutor, который также должен иметь возможность выполнять внутреннее планирование. Вы знаете что-нибудь для этой цели?

Или не могли бы вы предложить мне лучшие подходы для моделирования такой системы с точки зрения параллелизма (например, другую декомпозицию задач)?

P.s.: я ограничен "классическими" механизмами параллелизма, не включая акторов или аналогичные архитектуры.


person metaphori    schedule 05.08.2013    source источник
comment
Если thisParticle.waitForAllTheParticlesToCalculateNetForce(); фактически ожидает чего-то (через фактическое ожидание или CountdownLatch/CyclicBarrier/Phaser и т. д.), поток, в котором выполняется этот метод, будет возвращен в пул и станет доступным для других задач. Не уверен, что понимаю, почему ваш подход FixedThreadPool не сработает.   -  person assylias    schedule 05.08.2013
comment
То, что вы пытаетесь сделать, - это моделирование на основе агента, возможно, стоит добавить тег.   -  person Robadob    schedule 05.08.2013
comment
Вы также можете создать столько потоков, поскольку поток является абстракцией за пределами ядер процессора и работает даже на одноядерных машинах. Однако иметь 100 или более таких тем — плохая идея.   -  person zapl    schedule 05.08.2013
comment
@assylias Такой метод фактически выполняет await () для экземпляра CyclicBarrier. Однако система зависает, так как никакие другие задачи частиц не выполняются. Выполняются только первые PROCESSORS+1 задачи.   -  person metaphori    schedule 05.08.2013
comment
Тогда у вас, вероятно, есть проблема в вашем коде (например, вы держите блокировку, которая не позволяет другим потокам выполняться, когда вы не должны).   -  person assylias    schedule 05.08.2013
comment
@assylias Я так не думаю. Взгляните на: pastebin.com/rpVDZtML   -  person metaphori    schedule 05.08.2013
comment
@RobertoCasadei await - это неблокирующий вызов: поток становится бездействующим до тех пор, пока все стороны не вызовут await(). Поэтому, если вы не удерживаете блокировку при вызове await, другие задачи должны иметь возможность использовать этот незанятый поток. Возможно, стоит опубликовать часть вашего кода.   -  person assylias    schedule 05.08.2013
comment
@RobertoCasadei Для работы вашего опубликованного кода требуется не менее 10 (10 переданных в качестве аргумента конструктора) потоков. await будет ждать/блокировать, пока все стороны не будут ждать. В вашем случае только два потока преодолеют барьер и будут ждать бесконечно. Как только все десять потоков вызовут await, остальная часть метода будет продолжена.   -  person John Vint    schedule 05.08.2013
comment
@JohnVint Это было именно то, что я хотел показать, то есть ожидание на барьере не приводит к приостановке текущей задачи и выполнению другой задачи в освобожденном потоке. Моя проблема в том, что я хочу иметь несколько физических потоков, но много длительных задач; Executor с средствами планирования сделает эту работу.   -  person metaphori    schedule 05.08.2013
comment
@RobertoCasadei Я полагаю, это то, что вы имели в виду, просто хотел уточнить. Посредством «разделяй и властвуй» вилочное соединение — лучший способ заставить Java выполнять эти задачи за вас (на мой взгляд).   -  person John Vint    schedule 05.08.2013


Ответы (7)


Самым большим убийцей производительности, вероятно, будут проверки безопасности потоков, которые вы выполняете, чтобы убедиться, что все частицы взаимодействуют безопасным образом. Я предлагаю вам использовать один поток на ядро ​​и попытаться свести к минимуму взаимодействие между потоками. Это можно сделать, разделив ваше пространство на потоки, например. половина X, половина Y, половина Z делит пространство на 8. Вы можете смотреть на все взаимодействия в каждом пространстве одновременно и независимо, и вам нужно только беспокоиться, когда частица переходит из одного пространства/потока в другое.

person Peter Lawrey    schedule 05.08.2013
comment
Я не думаю, что такое разбиение подходит для гравитационного моделирования. - person Marko Topolnik; 05.08.2013
comment
Я почти уверен, что чистая масса и направление для частицы за пределами области представляют собой сумму масс и местоположений в любом заданном пространстве. Это становится более сложным для частиц внутри области. - person Peter Lawrey; 05.08.2013
comment
не нужно делить частицы по их расположению, лучше сразу разбить их изначально на постоянные наборы одинакового размера, каждый набор обрабатывается одним потоком. Действительно, наибольшая оптимизация состоит в том, чтобы суммировать гравитацию из каждого набора, заменяя каждый набор одной частицей с суммированной массой, расположенной в центре тяжести. - person Alexei Kaigorodov; 23.08.2013
comment
@AlexeiKaigorodov Причина разделения частицы в том, что вы хотите, чтобы у них были столкновения. Если вы этого не сделаете, вы можете разделить их, как вы предлагаете. - person Peter Lawrey; 24.08.2013
comment
Поскольку проверка столкновений обычно требует n ^ 2 вычислений. Разделить пространство на 8 — хорошая идея. Однако, по сути, у вас все еще есть проблема O (n ^ 2). Эта проблема плохо претендует на матричную близость. - person rdllopes; 12.02.2014
comment
@rdllopes Это правда, однако константа намного ниже, что делает ее в 8–80 раз быстрее. (Дополнительное 10-кратное увеличение связано с тем, что не требуется блокировка доступа и работа в кеше L1 вместо общего кеша L3) - person Peter Lawrey; 12.02.2014
comment
@peterlawrey, ваша идея (или использование таких вещей, как BSP) особенно хороша с несколькими объектами (десятки или, может быть, до нескольких сотен). Когда у вас есть огромное количество объектов, я должен использовать ДОПОЛНИТЕЛЬНО некоторые другие методы. Моделирование 10-метровых объектов: youtube.com/watch?v=Qve54Z71VYU - person rdllopes; 12.02.2014

Я бы предположил, что вы храните все свои частицы, возможно, в массиве двумерного массива? Это был бы отличный кандидат для Fork-Join Framework.

Вы бы разделили вычисление частей массива на более мелкие части. Вы продолжаете разделять до определенного размера. Наконец вы вычисляете и возвращаетесь. Затем возвращаемое значение будет объединено и рассчитано с другой стороной дерева.

person John Vint    schedule 05.08.2013

Вместо потока на частицу я бы создал ExecutorService с соответствующим количеством потоков. Я бы сохранил частицы в списке (или в коллекции какого-либо другого типа). Я бы создал отдельные части работы, которые будут выполняться (либо как Runnable, либо как Callable) для каждого шага расчета и обновления частиц. Когда вы отправляете часть работы исполнителю, вы получаете обратно Future. Поместите эти фьючерсы в коллекцию. После того, как вы отправили все части работы, которые хотите выполнять параллельно, вы перебираете свой список фьючерсов и вызываете get() для каждого из них, чтобы реализовать шаги синхронизации.

В конечном итоге вы можете создать небольшой POJO, чтобы связать частицу и ее рассчитанную силу (или спрятать рассчитанную силу в экземпляре частицы).

person Rob    schedule 05.08.2013
comment
Это подход, который я рассматривал. Однако с концептуальной точки зрения он показался мне менее эффективным (поскольку совершенно прямолинейно думать об отображении между частицами и параллельными действиями — определенно не потоками), и я заметил, что его реализация требует создания большого количество объектов и множество вызовов методов. - person metaphori; 05.08.2013
comment
Да, вы должны сбалансировать преимущества контролируемого количества потоков, достаточного параллелизма для загрузки нескольких ядер, накладных расходов на сборку мусора и т. д. По моему опыту, количество вызовов методов является проблемой производительности. Учитывая n-квадратный характер ваших вычислений и тот факт, что накладные расходы на мусор дополнительных объектов растут линейно с количеством частиц, в данном случае это кажется хорошим компромиссом. - person Rob; 05.08.2013

Для каждой частицы вы называете calculateNetForce(allTheParticles), что, как я полагаю, делает ваши вычисления пропорциональными O(N^2) (квадрату числа всех частиц). Это главный убийца производительности, и вам лучше найти алгоритм со сложностью O(N), а уже потом пытаться распараллелить. Навскидку могу предложить сначала вычислить суммарную массу и центр тяжести для всех частиц. Затем для каждой частицы рассчитайте массу и центр остальных частиц. Это можно сделать, взяв общую массу и центр и добавив «дырку» с отрицательной массой вместо текущей частицы. Затем вычислите силу между частицей и остальными. Вычисления для каждой частицы независимы и могут быть распараллелены любым из способов, предложенных другими комментаторами.

person Alexei Kaigorodov    schedule 23.08.2013

Почему бы вам не делать расчеты дискретными шагами?

while(true){


for(Particle p : allParticles){
   force = p.calculateNetForce(allParticles);   
   p.setNextPosition(force); //Remembers, but doesn't change the current position
}

for(Particle p : allParticles){
    p.nextState(); //Change the position
}

}

Сначала вычислите силу для каждой частицы, но не меняйте ее текущее состояние. После того, как вы рассчитали его для каждой частицы, обновите ее внутреннее состояние в соответствии с вашими предыдущими расчетами. Таким образом, даже одного потока будет достаточно, и, конечно, вы можете разделить вычисления на несколько потоков, но вам потребуется дополнительная синхронизация.

ОБНОВЛЕНИЕ ЯВА 8

Используя Java 8, вы можете воспользоваться преимуществами многоядерных систем, не заботясь о потоках, синхронизации и т. д.

 while(true){
       allParticles.parallelStream().forEach(p -> {
           double force = p.calculateNetForce(allParticles);
           p.setNextPosition(force)
       });

       allParticles.parallelStream().forEach(p ->   p.nextState());      
 }
person Svetlin Zarev    schedule 05.08.2013
comment
Действительно, мои проблемы возникают из-за необходимости идентифицировать параллелизм, который можно использовать. - person metaphori; 05.08.2013
comment
Тогда просто создайте N потоков и разделите работу между ними! Поместите все объекты Particle в ConcurrentQueue и для каждого потока: code while(true){ while(!queue.isEmpty()){ 1. получить частицу 2. вычислить силу } 3. обновить состояние //синхронизировать или поменять местами очереди }` - person Svetlin Zarev; 05.08.2013
comment
Вопрос, по сути, в том, как разделить работу с технической/концептуальной точек зрения. - person metaphori; 05.08.2013
comment
Что ж, моя идея состоит в том, чтобы иметь 2 параллельные очереди (CQ) и набор, содержащий все ваши объекты Particle (PO) и N потоков. Сначала первый CQ (CQ1) также содержит все PO. Затем каждый поток начинает принимать PO от CQ1, вычисляет чистую силу и устанавливает следующее состояние (позицию) без изменения текущего состояния PO, а затем помещает PO в CQ2. Когда CQ1 становится пустым, поток, который последним обращался к нему, устанавливает флаг, а затем все потоки stat получают PO от CQ2 и вызывают po.nextState(), а затем отправляют его на CQ1. Когда CQ2 становится пустым, все начинается сначала... - person Svetlin Zarev; 05.08.2013
comment
другим более эффективным подходом будет не перемещать PO из CQ1 в CQ2, а затем из CQ2 в CQ1, а менять местами CQ1 и CQ2, но тогда вам потребуется внешний механизм синхронизации (подойдет простой логический флаг) и механизм выбора состояния для доступа (используя внешний логический флаг) - person Svetlin Zarev; 05.08.2013
comment
@svetlin-zarev, ты прав. Основная проблема заключается в том, как разделить проблему на небольшие задачи. Использование OpenCL, Fork n Join и т. д. не имеет значения, потому что на самом деле вам нужен умный способ разделить проблему. Модель агента или актера - это ужасная идея в этой ситуации. - person rdllopes; 12.02.2014
comment
Прошло много времени с тех пор, как я ответил на этот вопрос. Теперь в Java 8 есть lambdas и stream-API, и очень легко воспользоваться преимуществами многоядерных систем. Например, он может использовать allParticles.paralellSream().forEach(p -> p.calculateNetForce(allTheParticles)), что позволит использовать преимущества всех ЦП, при этом ему не нужно будет вручную заботиться о потоках, синхронизации и т. д. - person Svetlin Zarev; 12.02.2014

Частица сама должна быть Runnable и Callable, это позволит вам не создавать много лишних объектов и синхронизировать разные шаги. Вот SSCCEE:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Particle implements Callable<Void> {

  private enum ParticleState {
    POSITION_UPDATED, FORCE_CALCULATED
  }

  private int id;
  private int calculatedForce;
  private ParticleState particleState = ParticleState.POSITION_UPDATED;
  private List<Particle> allTheParticles;

  public Particle(int id, List<Particle> allTheParticles) {
    this.id = id;
    this.allTheParticles = allTheParticles;
  }

  private void calculateNetForce() {
    System.out.println("calculation in " + id);
    String someIntenseOperation = "";
    for (int i = 0; i < 10000; i++) {
      someIntenseOperation += allTheParticles.size();
    }
    calculatedForce = 0;
    particleState = ParticleState.FORCE_CALCULATED;
  }

  private void updatePosition() {
    System.out.println("updating position of " + id);
    particleState = ParticleState.POSITION_UPDATED;
  }

  @Override
  public Void call() throws Exception {
    switch (particleState) {
      case FORCE_CALCULATED:
        updatePosition();
        break;
      case POSITION_UPDATED:
        calculateNetForce();
        break;
    }
    return null;
  }

  public static void main(String[] args) throws InterruptedException {
    final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    final List<Particle> allTheParticles = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
      allTheParticles.add(new Particle(i, allTheParticles));
    }
    while (true) {
      executor.invokeAll(allTheParticles);
      executor.invokeAll(allTheParticles);
    }
  }
}
person Yurii Shylov    schedule 05.08.2013
comment
Как вы гарантируете, что все частицы синхронизированы относительно их состояния? - person metaphori; 05.08.2013
comment
executor.invokeAll делает это за вас. Попробуйте запустить этот пример. - person Yurii Shylov; 05.08.2013

Поскольку для проверки столкновений обычно требуется n^2 вычислений, разделение пространства является хорошей идеей. хотя, по сути, это будет проблема O (n ^ 2).

Эта проблема плохо подходит для матричного приближения (но загляните в раздел Параллельные вычисления, чтобы узнать о лучших решениях). идеи, чтобы справиться с этим) Вы можете использовать некоторые методы, указанные здесь: способ симулировать много столкновений частиц?

Обратите внимание, что использовать модель Актера не рекомендуется, поскольку потоки будут проблематичными после определенного количество.

Теперь есть Java OpenCL lib (например: Aparapi), а Java 9 должна включать openCL с Sumatra. проект. Таким образом, вы можете использовать библиотеку Fork and Join, а JVM будет использовать OpenCL под капотом.

person rdllopes    schedule 12.02.2014