Высокопроизводительная буферизация для потока рандов

У меня есть код, который потребляет большое количество (миллионы в настоящее время, в конечном итоге миллиарды) относительно коротких (5-100 элементов) массивов случайных чисел и выполняет с ними не очень напряженную математику. Случайные числа, ну, случайные, в идеале я хотел бы генерировать их на нескольких ядрах, поскольку генерация случайных чисел составляет> 50% моего времени выполнения при профилировании. Однако у меня возникли трудности с распределением большого количества небольших задач таким образом, чтобы это было не медленнее, чем однопоточный подход.

Мой код в настоящее время выглядит примерно так:

for(int i=0;i<1000000;i++){
    for(RealVector d:data){
        while(!converged){
            double[] shortVec = new double[5];
            for(int i=0;i<5;i++) shortVec[i]=rng.nextGaussian();
            double[] longerVec = new double[50];
            for(int i=0;i<50;i++) longerVec[i]=rng.nextGaussian();
            /*Do some relatively fast math*/
        }
    }
}

Подходы, которые я использовал, но которые не сработали:

  • 1+ потоков, заполняющих ArrayBlockingQueue, и мой основной цикл, потребляющий и заполняющий массив (упаковка/распаковка здесь была убийцей)
  • Генерация векторов с помощью Callable (получение будущего) при выполнении независимых частей математики (похоже, накладные расходы на косвенность перевешивают любые преимущества параллелизма, которые я получил)
  • Использование 2 ArrayBlockingQueue, каждая из которых заполняется потоком, один для коротких и один для длинных массивов (все еще примерно в два раза медленнее, чем прямой однопоточный случай).

Я не столько ищу «решения» своей конкретной проблемы, сколько как справиться с общим случаем параллельной генерации больших потоков небольших независимых примитивов и их потребления из одного потока.


person Bryce    schedule 03.08.2012    source источник


Ответы (2)


Это более эффективно, чем использование очереди, потому что;

  • полезная нагрузка представляет собой массив double[], что означает, что фоновый поток может генерировать больше данных, прежде чем передавать их.
  • все объекты перерабатываются.

.

public class RandomGenerator {
    private final ExecutorService generator = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "generator");
            t.setDaemon(true);
            return t;
        }
    });
    private final Exchanger<double[][]> exchanger = new Exchanger<>();
    private double[][] buffer;
    private int nextRow = Integer.MAX_VALUE;

    public RandomGenerator(final int rows, final int columns) {
        buffer = new double[rows][columns];
        generator.submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                Random random = new Random();
                double[][] buffer2 = new double[rows][columns];
                while (!Thread.interrupted()) {
                    for (int r = 0; r < rows; r++)
                        for (int c = 0; c < columns; c++)
                            buffer2[r][c] = random.nextGaussian();
                    buffer2 = exchanger.exchange(buffer2);
                }
                return null;
            }
        });
    }

    public double[] nextArray() throws InterruptedException {
        if (nextRow >= buffer.length) {
            buffer = exchanger.exchange(buffer);
            nextRow = 0;
        }
        return buffer[nextRow++];
    }
}

Random является потокобезопасным и синхронизированным. Это означает, что каждому потоку требуется собственный Random для одновременного выполнения.

как справиться с общим случаем параллельной генерации больших потоков небольших независимых примитивов и их потребления из одного потока.

Я бы использовал Exchanger<double[][]> для заполнения значения в фоновом режиме, так как они эффективно передаются (без больших накладных расходов GC)

person Peter Lawrey    schedule 03.08.2012
comment
@Грей тоже верно. Приближаемся к ответу на вопрос. - person Peter Lawrey; 03.08.2012
comment
Добавлен пример использования Exchanger для генерации случайных чисел пакетами в фоновом режиме. - person Peter Lawrey; 03.08.2012
comment
Комбинация обменника для связи и разделения более длинных потоков рандов значительно повысила производительность. Спасибо. - person Bryce; 05.08.2012

Проблема с вашей производительностью, по-видимому, заключается в том, что отдельные задания слишком малы, поэтому большая часть времени тратится на синхронизацию и постановку в очередь самих заданий. Следует учитывать одну вещь: не генерировать большой поток небольших заданий, а доставлять каждому рабочему потоку набор заданий среднего размера, который он будет комментировать ответом.

Например, вместо повторения вашего цикла с первым потоком, выполняющим итерацию № 0, следующим потоком, выполняющим итерацию № 1, ... я бы сделал первый поток, выполняющий итерации с № 0 до № 999 или что-то в этом роде. Они должны работать независимо и аннотировать класс Job ответом своих вычислений. Затем в конце они могут вернуть всю коллекцию заданий, которые были завершены как Future.

Ваш класс Job может выглядеть примерно так:

public class Job {
    Collection<RealVector> dataCollection;
    Collection<SomeAnswer> answerCollection = new ArrayList<SomeAnswer>();
    public void run() {
        for (RealVector d : dataCollection) {
           // do the magic work on the vector
           while(!converged){
              ...
           }
           // put the associated "answer" in another collection
           answerCollection.add(someAnswer);
        }
    }
}
person Gray    schedule 03.08.2012
comment
Похоже, что разделение по этим линиям позволит избежать некоторых проблем с накладными расходами. Я надеялся не заходить слишком далеко по этому пути, потому что количество векторов, необходимых для внутреннего цикла, на самом деле несколько недетерминировано, а это означает, что должен быть механизм, чтобы задание запрашивало или генерировало еще один кусок ранда, если он заканчивается, и некоторые из предварительно сгенерированных рандов могут быть потрачены впустую. - person Bryce; 03.08.2012
comment
Опять же, цель здесь состоит в том, чтобы свести к минимуму объем синхронизации @Bryce. Может быть, каждый поток мог бы иметь ThreadLocal со своими рандами, чтобы они не тратились впустую? Или просто увеличьте размер фрагмента, чтобы уменьшить потери рандов на задание до такой степени, что это не имеет значения во время выполнения. - person Gray; 03.08.2012
comment
Я должен сделать резервную копию @Gray на этих 5-100 элементах, что безнадежно мало для эффективной связи между потоками. Если вы можете увеличить это значение, скажем, в 1000 раз, ситуация должна улучшиться. - person Martin James; 03.08.2012