Оптимизация общего буфера в многопоточной среде производитель/потребитель

У меня есть проект, в котором у меня есть один поток-производитель, который записывает события в буфер, и дополнительный одиночный поток-потребитель, который берет события из буфера. Моя цель — оптимизировать эту штуку для одного двухъядерного компьютера, чтобы добиться максимальной пропускной способности.

В настоящее время я использую простой кольцевой буфер без блокировки (без блокировки возможен, поскольку у меня есть только один поток потребителя и один поток производителя, и поэтому указатели обновляются только одним потоком).

#define BUF_SIZE 32768

struct buf_t { volatile int writepos; volatile void * buffer[BUF_SIZE]; 
    volatile int readpos;) };

void produce (buf_t *b, void * e) {
    int next = (b->writepos+1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[b->writepos] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    while (b->readpos == b->writepos); // nothing to consume. wait
    int next = (b->readpos+1) % BUF_SIZE;
    void * res = b->buffer[b->readpos]; b->readpos = next;
    return res;
}

buf_t *alloc () {
    buf_t *b = (buf_t *)malloc(sizeof(buf_t));
    b->writepos = 0; b->readpos = 0; return b;
}

Однако эта реализация еще недостаточно быстра и нуждается в дальнейшей оптимизации. Я пробовал с разными значениями BUF_SIZE и получил некоторое ускорение. Кроме того, я переместил writepos перед buffer и readpos после buffer, чтобы убедиться, что обе переменные находятся в разных строках кэша, что также привело к некоторой скорости.

Мне нужно ускорение около 400%. У вас есть идеи, как я могу добиться этого, используя такие вещи, как отступы и т. Д.?


person Etan    schedule 05.05.2010    source источник
comment
без блокировки возможен, поскольку у меня есть только один поток-потребитель и один поток-производитель. Что произойдет, если поток-потребитель и производитель столкнутся?   -  person Mitch Wheat    schedule 05.05.2010
comment
Сколько процессора сжигается в режиме ожидания?   -  person Marcelo Cantos    schedule 05.05.2010
comment
Занят ждет??? Вы не спрашиваете, как сделать это быстрее, если у вас много ожиданий.   -  person sharptooth    schedule 05.05.2010
comment
@mitch: когда потоки потребителя и производителя сталкиваются, один поток вынужден находиться в спин-блокировке while. Согласованность обеспечивается, поскольку буфер считывается/записывается до обновления указателя позиции чтения/записи. @marcelo: В моей тестовой среде производитель рассылает случайные значения внутри очереди, в то время как потребитель ничего не делает, кроме их потребления. Следовательно, процессы не должны ждать так долго, о чем следует беспокоиться. Кроме того, два процесса выполняются на разных ядрах ЦП, поэтому они не должны влиять на время ЦП друг друга.   -  person Etan    schedule 05.05.2010
comment
Поправьте меня, если я ошибаюсь, но если у вас несколько процессоров, вам, вероятно, понадобится барьер памяти между производством и потреблением. Это замедлит вас (возможно, сильно), но без этого потребитель может увидеть перемещение индекса записи до того, как данные будут записаны в буфер, и получить неверные данные.   -  person Aidan Cully    schedule 05.05.2010
comment
Что вы имеете в виду под одной машиной? Потоки находятся внутри процесса, и, как правило, процесс выполняется на одной и только одной машине. Обычно вопрос заключается в том, сколько ядер ЦП у вас есть, поскольку ошибки MT часто проявляются только в многоядерном сценарии, где работа действительно выполняется параллельно, и у вас есть проблемы с кэшированием памяти.   -  person sbi    schedule 05.05.2010
comment
@Aidan Cully: это зависит от архитектуры. Например, на платформе x86 гарантируется, что записи не будут переупорядочены по отношению к другим операциям записи, выполняемым тем же ЦП.   -  person caf    schedule 05.05.2010
comment
Это на двухъядерной машине.   -  person Etan    schedule 06.05.2010
comment
Сколько времени на самом деле тратится в буфере и сколько времени занято ожиданием? Возможно, вы могли бы легко получить свои 400% в другом месте.   -  person nos    schedule 06.05.2010
comment
измеренная производительность предназначена только для реализации производства/потребления. занятое ожидание не должно создавать проблем, поскольку потоки выполняются на разных ядрах.   -  person Etan    schedule 06.05.2010


Ответы (3)


Вот одна оптимизация, которую я вижу: в consume() вам не нужно постоянно получать b->readpos, поскольку поток, вызывающий consume(), является единственным, который в любом случае может его обновить. Поскольку это volatile, компилятор не может оптимизировать все эти выборки, поэтому вам нужно будет сделать это явно:

void * consume (buf_t *b) {
    int rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    int next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[rp]; b->readpos = next;
    return res;
}

Вы также должны пройти через свой буфер шагами, по крайней мере, на одну кэш-линию каждый, иначе вы получите кэш-линии, пинг-понгящие между двумя ЦП (поскольку один ЦП хочет, чтобы кэш-строка считывала b->buffer[n] и 15 раз из 16, другой делает его недействительным для напишите b->buffer[n+1]). Например:

#define STRIDE 16
#define STEPS 2048
#define BUF_SIZE (STRIDE * STEPS)

#define TO_INDEX(n) (STRIDE * (((n) + 1) % STEPS) + (((n) + 1) / STEPS))

void produce (buf_t *b, void * e) {
    unsigned wp = b->writepos;
    unsigned next = (wp + 1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[TO_INDEX(wp)] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    unsigned rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    unsigned next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[TO_INDEX(rp)]; b->readpos = next;
    return res;
}

В любом случае, стоит попробовать. (Обратите внимание, что пока STRIDE и STEPS являются степенями числа 2, страшно выглядящие деление и модуль в TO_INDEX() могут быть оптимизированы для сдвига и побитового-и, но только если операнды unsigned - поэтому я предлагаю изменить эти типы соответствующим образом. ).

person caf    schedule 05.05.2010

Я предполагаю, что вы используете машину с более чем одним процессором или ядром. Если нет, то ваше занятое ожидание повредит вещам. В любом случае они могут повредить, если вы работаете под ОС, которая решает, что вы недостаточно спите, и снижает ваш динамический приоритет, и есть другие запущенные программы.

Вам необходимо собрать данные о том, насколько заполнен ваш буфер. В какой-то момент слишком большой размер начинает вредить и вашему кешу.

Если вы используете глобальный массив, а не выделяете его из кучи, то указатель на буфер становится литералом указателя, и обоим потокам не нужно будет считывать это значение указателя из одного и того же места в кеше, потому что оно будет просто вставлено в код .

Если для вас важна пропускная способность (за счет задержки), а кэш действительно играет большую роль, то вы можете позволить потребителю отставать от производителя, чтобы они не пытались читать и писать из одного и того же места в памяти. буфер.

Возможно, вы захотите изменить интерфейс своей потребительской функции, чтобы она могла потреблять фрагменты размера кеша (или кратные) (это хорошо работает с потребителем, отстающим от предложения производителя, которое я сделал выше) в дополнение к отдельным или частичным фрагментам строки кэша. . Старайтесь выровнять кеш потребления. Если вы думаете о доступных данных как о змее, то голова и хвост могут быть не выровнены. Вы должны использовать невыровненный хвост только тогда, когда нет других данных для использования. Если вы можете использовать какие-либо другие данные в вызове для потребления, вам следует просто оставить хвост для следующего вызова.

Помимо этого и того, что было упомянуто caf, я должен подозревать, что все, что происходит за пределами этого кода, должно играть большую роль.

person nategoose    schedule 05.05.2010

Я реализовал оптимизацию в первом блоке кода ответа кафе. На самом деле они немного прибавили скорости (спасибо), но этого пока недостаточно. Вторая оптимизация, которая приводит к заполнению кеша по столбцам, а не по строкам, приводит к худшей производительности.

Идея отставания потребителя от производителя не давала никакого ускорения.

Сейчас я на 300%.

Дополнительным изменением, которое я сделал, был хак в отношении volatile переменных writepos и readpos:

void produce (void * e) {
    unsigned int oldpos = buffer.writepos;
    unsigned int next = (oldpos+1) % BUF_SIZE;
    while (next == buffer.rpos) { // rpos is not volatile
        buffer.rpos = buffer.readpos;
        usleep(1);
    }
    buffer.buffer[oldpos] = e; buffer.writepos = next;
}

и аналогично для потребления().

Дополнительные изменения в структуре приводят к следующей новой структуре буфера (в глобальной области, как было предложено в одном ответе, а не в куче).

#define STRIDE 16
#define STEPS 524288

struct buf_t {
    volatile unsigned int writepos;
    int offset [STRIDE - 1];
    unsigned int wpos;
    int offset2 [STRIDE - 1];
    volatile void * buffer[BUF_SIZE];
    int offset4 [STRIDE];
    volatile unsigned int readpos;
    int offset3 [STRIDE - 1];
    unsigned int rpos;
}

что дало отсутствующее ускорение на 300% и опустило его ниже предела производительности, которого я должен был достичь.

Если у вас есть дополнительные хаки, которые можно использовать для дальнейшего повышения производительности, не стесняйтесь опубликовать их :-)

Спасибо за помощь.

person Etan    schedule 06.05.2010