Самый производительный способ вычитания одного массива из другого

У меня есть следующий код, который является узким местом в одной части моего приложения. Все, что я делаю, это вычитаю массив из другого. Оба этих массива содержат более 100 000 элементов. Я пытаюсь найти способ сделать это более производительным.

var
  Array1, Array2 : array of integer;

..... 
// Code that fills the arrays
.....

for ix := 0 to length(array1)-1
  Array1[ix] := Array1[ix] - Array2[ix];

end;

У кого-нибудь есть предложение?


person Michael Küller    schedule 15.02.2011    source источник
comment
Удачи вам стать лучше!   -  person Gabe    schedule 15.02.2011
comment
Может быть, какая-то петля разворачивается, но я не знаю ничего лучшего прямо сейчас.   -  person The_Fox    schedule 15.02.2011
comment
Простые 100000 целочисленных вычитаний должны быть очень быстрыми. Я предполагаю, что вы выполняете этот код в узком цикле или что-то в этом роде, если чувствуете, что это узкое место. Тогда решение Дэвида @David единственное. (Ну, конечно, если нет способа вообще избавиться от вычитания массива, но это уже другой вопрос.)   -  person Andreas Rejbrand    schedule 15.02.2011
comment
($R-), чтобы отключить проверку диапазона, по крайней мере, в цикле for. Возможно, также в приведенном выше коде, который заполняет массивы, если вы тщательно управляете диапазоном индексов...   -  person RobertFrank    schedule 15.02.2011
comment
Ах, вернемся к {$R-} дебатам!   -  person David Heffernan    schedule 15.02.2011
comment
Завтра попробую отключить проверку диапазона. Спасибо за совет.   -  person Michael Küller    schedule 16.02.2011
comment
это может быть хорошим вопросом, чтобы задать его на refactormycode.com/tags/delphi   -  person Christopher Chase    schedule 16.02.2011
comment
Dec(Array1[ix], Array2[ix]) вместо Array1[ix] := Array1[ix] - Array2[ix]. Не уверен, что он будет работать быстрее, но вы, вероятно, сможете напечатать его быстрее, так что в некотором отношении это более производительный способ.   -  person Andriy M    schedule 16.02.2011
comment
У Хироюки Хори был кое-какой MMX-материал для Delphi: torry.net/authorsmore.php?id=1181   -  person Warren P    schedule 16.02.2011
comment
@Andriy Это не работает быстрее. Если бы это было так, вы бы разочаровались в компиляторе!   -  person David Heffernan    schedule 16.02.2011
comment
@David: Как я и подозревал. Таким образом, у нас остается только почти несуществующее повышение производительности за счет меньшего набора текста.   -  person Andriy M    schedule 16.02.2011
comment
@Worm С уважением: XMM лучше! Да, это хороший и простой случай, проверьте мой ответ.   -  person GJ.    schedule 16.02.2011


Ответы (5)


Меня очень интересовала оптимизация скорости в этом простом случае. Итак, я сделал 6 простых процедур и замерил такт процессора и время при размере массива 100000;

  1. Pascal-процедура с опцией компилятора Range and Overflow Checking On
  2. Процедура Pascal с опцией компилятора Range и Overflow Checking off
  3. Классическая процедура ассемблера x86.
  4. Процедура на ассемблере с инструкциями SSE и перемещением 16 байт без выравнивания.
  5. Процедура на ассемблере с инструкциями SSE и выровненным 16-байтовым перемещением.
  6. Ассемблер 8 раз развернул цикл с инструкциями SSE и выровнял 16-байтовое перемещение.

Проверьте результаты на изображении и коде для получения дополнительной информации. введите здесь описание изображения

Чтобы получить 16-байтовое выравнивание памяти, сначала удалите точку в директиве FastMM4Options.inc файла {$.define Align16Bytes}!

program SubTest;

{$APPTYPE CONSOLE}

uses
//In file 'FastMM4Options.inc' delite the dot in directive {$.define Align16Bytes}
//to get 16 byte memory alignment!
  FastMM4,
  windows,
  SysUtils;

var
  Ar1                   :array of integer;
  Ar2                   :array of integer;
  ArLength              :integer;
  StartTicks            :int64;
  EndTicks              :int64;
  TicksPerMicroSecond   :int64;

function GetCpuTicks: int64;
asm
  rdtsc
end;
{$R+}
{$Q+}
procedure SubArPasRangeOvfChkOn(length: integer);
var
  n: integer;
begin
  for n := 0 to length -1 do
    Ar1[n] := Ar1[n] - Ar2[n];
end;
{$R-}
{$Q-}
procedure SubArPas(length: integer);
var
  n: integer;
begin
  for n := 0 to length -1 do
    Ar1[n] := Ar1[n] - Ar2[n];
end;

procedure SubArAsm(var ar1, ar2; length: integer);
asm
//Length must be > than 0!
   push ebx
   lea  ar1, ar1 - 4
   lea  ar2, ar2 - 4
@Loop:
   mov  ebx, [ar2 + length * 4]
   sub  [ar1 + length * 4], ebx
   dec  length
   jnz  @Loop
@exit:
   pop  ebx
end;

procedure SubArAsmSimdU(var ar1, ar2; length: integer);
asm
//Prepare length
  push     length
  shr      length, 2
  jz       @Finish
@Loop:
  movdqu   xmm1, [ar1]
  movdqu   xmm2, [ar2]
  psubd    xmm1, xmm2
  movdqu   [ar1], xmm1
  add      ar1, 16
  add      ar2, 16
  dec      length
  jnz      @Loop
@Finish:
  pop      length
  push     ebx
  and      length, 3
  jz       @Exit
//Do rest, up to 3 subtractions...
  mov      ebx, [ar2]
  sub      [ar1], ebx
  dec      length
  jz       @Exit
  mov      ebx, [ar2 + 4]
  sub      [ar1 + 4], ebx
  dec      length
  jz       @Exit
  mov      ebx, [ar2 + 8]
  sub      [ar1 + 8], ebx
@Exit:
  pop      ebx
end;

procedure SubArAsmSimdA(var ar1, ar2; length: integer);
asm
  push     ebx
//Unfortunately delphi use first 8 bytes for dinamic array length and reference
//counter, from that reason the dinamic array address should start with $xxxxxxx8
//instead &xxxxxxx0. So we must first align ar1, ar2 pointers!
  mov      ebx, [ar2]
  sub      [ar1], ebx
  dec      length
  jz       @exit
  mov      ebx, [ar2 + 4]
  sub      [ar1 + 4], ebx
  dec      length
  jz       @exit
  add      ar1, 8
  add      ar2, 8
//Prepare length for 16 byte data transfer
  push     length
  shr      length, 2
  jz       @Finish
@Loop:
  movdqa   xmm1, [ar1]
  movdqa   xmm2, [ar2]
  psubd    xmm1, xmm2
  movdqa   [ar1], xmm1
  add      ar1, 16
  add      ar2, 16
  dec      length
  jnz      @Loop
@Finish:
  pop      length
  and      length, 3
  jz       @Exit
//Do rest, up to 3 subtractions...
  mov      ebx, [ar2]
  sub      [ar1], ebx
  dec      length
  jz       @Exit
  mov      ebx, [ar2 + 4]
  sub      [ar1 + 4], ebx
  dec      length
  jz       @Exit
  mov      ebx, [ar2 + 8]
  sub      [ar1 + 8], ebx
@Exit:
  pop      ebx
end;

procedure SubArAsmSimdAUnrolled8(var ar1, ar2; length: integer);
asm
  push     ebx
//Unfortunately delphi use first 8 bytes for dinamic array length and reference
//counter, from that reason the dinamic array address should start with $xxxxxxx8
//instead &xxxxxxx0. So we must first align ar1, ar2 pointers!
  mov      ebx, [ar2]
  sub      [ar1], ebx
  dec      length
  jz       @exit
  mov      ebx, [ar2 + 4]
  sub      [ar1 + 4], ebx
  dec      length
  jz       @exit
  add      ar1, 8                       //Align pointer to 16 byte
  add      ar2, 8                       //Align pointer to 16 byte
//Prepare length for 16 byte data transfer
  push     length
  shr      length, 5                    //8 * 4 subtructions per loop
  jz       @Finish                      //To small for LoopUnrolled
@LoopUnrolled:
//Unrolle 1, 2, 3, 4
  movdqa   xmm4, [ar2]
  movdqa   xmm5, [16 + ar2]
  movdqa   xmm6, [32 + ar2]
  movdqa   xmm7, [48 + ar2]
//
  movdqa   xmm0, [ar1]
  movdqa   xmm1, [16 + ar1]
  movdqa   xmm2, [32 + ar1]
  movdqa   xmm3, [48 + ar1]
//
  psubd    xmm0, xmm4
  psubd    xmm1, xmm5
  psubd    xmm2, xmm6
  psubd    xmm3, xmm7
//
  movdqa   [48 + ar1], xmm3
  movdqa   [32 + ar1], xmm2
  movdqa   [16 + ar1], xmm1
  movdqa   [ar1], xmm0
//Unrolle 5, 6, 7, 8
  movdqa   xmm4, [64 + ar2]
  movdqa   xmm5, [80 + ar2]
  movdqa   xmm6, [96 + ar2]
  movdqa   xmm7, [112 + ar2]
//
  movdqa   xmm0, [64 + ar1]
  movdqa   xmm1, [80 + ar1]
  movdqa   xmm2, [96 + ar1]
  movdqa   xmm3, [112 + ar1]
//
  psubd    xmm0, xmm4
  psubd    xmm1, xmm5
  psubd    xmm2, xmm6
  psubd    xmm3, xmm7
//
  movdqa   [112 + ar1], xmm3
  movdqa   [96 + ar1], xmm2
  movdqa   [80 + ar1], xmm1
  movdqa   [64 + ar1], xmm0
//
  add      ar1, 128
  add      ar2, 128
  dec      length
  jnz      @LoopUnrolled
@FinishUnrolled:
  pop      length
  and      length, $1F
//Do rest, up to 31 subtractions...
@Finish:
  mov      ebx, [ar2]
  sub      [ar1], ebx
  add      ar1, 4
  add      ar2, 4
  dec      length
  jnz      @Finish
@Exit:
  pop      ebx
end;

procedure WriteOut(EndTicks: Int64; Str: string);
begin
  WriteLn(Str + IntToStr(EndTicks - StartTicks)
    + ' Time: ' + IntToStr((EndTicks - StartTicks) div TicksPerMicroSecond) + 'us');
  Sleep(5);
  SwitchToThread;
  StartTicks := GetCpuTicks;
end;

begin
  ArLength := 100000;
//Set TicksPerMicroSecond
  QueryPerformanceFrequency(TicksPerMicroSecond);
  TicksPerMicroSecond := TicksPerMicroSecond div 1000000;
//
  SetLength(Ar1, ArLength);
  SetLength(Ar2, ArLength);
//Fill arrays
//...
//Tick time info
  WriteLn('CPU ticks per mikro second: ' + IntToStr(TicksPerMicroSecond));
  Sleep(5);
  SwitchToThread;
  StartTicks := GetCpuTicks;
//Test 1
  SubArPasRangeOvfChkOn(ArLength);
  WriteOut(GetCpuTicks, 'SubAr Pas Range and Overflow Checking On, Ticks: ');
//Test 2
  SubArPas(ArLength);
  WriteOut(GetCpuTicks, 'SubAr Pas, Ticks: ');
//Test 3
  SubArAsm(Ar1[0], Ar2[0], ArLength);
  WriteOut(GetCpuTicks, 'SubAr Asm, Ticks: ');
//Test 4
  SubArAsmSimdU(Ar1[0], Ar2[0], ArLength);
  WriteOut(GetCpuTicks, 'SubAr Asm SIMD mem unaligned, Ticks: ');
//Test 5
  SubArAsmSimdA(Ar1[0], Ar2[0], ArLength);
  WriteOut(GetCpuTicks, 'SubAr Asm with SIMD mem aligned, Ticks: ');
//Test 6
  SubArAsmSimdAUnrolled8(Ar1[0], Ar2[0], ArLength);
  WriteOut(GetCpuTicks, 'SubAr Asm with SIMD mem aligned 8*unrolled, Ticks: ');
//
  ReadLn;
  Ar1 := nil;
  Ar2 := nil;
end.

...

Самая быстрая процедура asm с 8-кратно развернутыми SIMD-инструкциями занимает всего 68 мкс и примерно в 4 раза быстрее, чем процедура Pascal.

Как мы видим, процедура цикла Паскаля, вероятно, не критична, она занимает всего около 277 мкс (проверка переполнения и диапазона) на процессоре 2,4 ГГц при 100000 вычитаний.

Значит, этот код не может быть узким местом?

person GJ.    schedule 17.02.2011
comment
Я знаю, что все остальные ответы верны, но я отметил этот, потому что он все вместе - person Michael Küller; 18.02.2011
comment
о большая ошибка на моей стороне. Я только что заметил, что в своем примере я объявил массивы целых чисел. Я работаю с массивами двойных. Это конечно большая разница - person Michael Küller; 18.02.2011
comment
Вы можете использовать SIMD-инструкцию SUBPD (вычитание упакованных значений с плавающей запятой двойной точности). Попробуйте выполнить новую процедуру, а затем откройте новый вопрос. :) Вы работаете с TChart? - person GJ.; 18.02.2011
comment
Обратите внимание, что в FastMM4 средние и большие блоки всегда выравниваются по 16 байтам, независимо от настройки. - person Johan; 22.07.2015

Выполнение этого в нескольких потоках с таким большим массивом приведет к линейному ускорению. Как говорится, это досадная параллель.

person David Heffernan    schedule 15.02.2011
comment
Хорошая идея. Попробуйте посмотреть параллельный цикл for в OmniThreadLibrary. - person Mason Wheeler; 15.02.2011
comment
Единственная проблема заключается в том, что обычно вам нужны фрагменты не менее 20 000, если не 50 000 итераций на задачу в параллельном цикле for. Если вы не будете осторожны, эта тривиальная вещь может быть медленнее, если вы не сможете переместить настройку потоковой передачи и отключение из областей, критически важных для производительности. Попробуйте это во что бы то ни стало, но я бы начал с разделения пополам, а не на 20 сегментов. - person ; 16.02.2011
comment
@moz Можно только представить, что этот код выполняется снова и снова. Если бы он запускался только один раз, проблем бы не было. Так что ясно, что вам нужно раскрутить все потоки и заблокировать их по событию до тех пор, пока они вам не понадобятся снова - классические вещи из пула потоков. - person David Heffernan; 16.02.2011
comment
Это, вероятно, повысит производительность больше всего, хотя и с наибольшими усилиями. Я попробую, если отключение проверки диапазона не повысит производительность в достаточной степени. - person Michael Küller; 16.02.2011
comment
@ Дэвид Хеффернан: мы определенно можем представить. Но мы не знаем. Таким образом, предложение об измерении является другой действительно ценной частью обсуждения. Удаление OmniThreadLibrary в соответствии с предложением Мейсона и измерение изменений, вероятно, является лучшим и самым простым решением. Создание потомка TThread и создание нескольких потоков для задачи может сработать, но это не очень хорошее решение. - person ; 16.02.2011
comment
@moz, как еще вы используете пул потоков без создания экземпляров потоков? - person David Heffernan; 16.02.2011
comment
@David Heffernan: дело не в том, создаете ли вы потоки, а в том, когда вы это делаете. если вы запускаете их на каком-то этапе вашего приложения с низким спросом и держите их в пределах стоимости, это совершенно не имеет значения. Если вы подождете, пока производительность не станет критической для их создания, вы можете замедлить выполнение вышеуказанной задачи. - person ; 16.02.2011
comment
@moz Очевидно, я выступаю за то, чтобы делать все, что ускоряет работу! - person David Heffernan; 17.02.2011

Запуск вычитания на большем количестве потоков звучит хорошо, но целочисленное солнечное вычитание 100 КБ не требует много процессорного времени, поэтому, возможно, пул потоков ... Однако потоки настроек также имеют много накладных расходов, поэтому короткие массивы будут иметь более медленную производительность в параллельных потоках, чем в только один (основной) поток!

Вы отключили в настройках компилятора проверку переполнения и диапазона?

Вы можете попробовать использовать asm рутину, это очень просто...

Что-то типа:

procedure SubArray(var ar1, ar2; length: integer);
asm
//length must be > than 0!
   push ebx
   lea  ar1, ar1 -4
   lea  ar2, ar2 -4
@Loop:
   mov  ebx, [ar2 + length *4]
   sub  [ar1 + length *4], ebx
   dec  length
//Here you can put more folloving parts of rutine to more unrole it to speed up.
   jz   @exit
   mov  ebx, [ar2 + length *4]
   sub  [ar1 + length *4], ebx
   dec  length
//
   jnz  @Loop
@exit:
   pop  ebx
end;


begin
   SubArray(Array1[0], Array2[0], length(Array1));

Можно гораздо быстрее...

EDIT: добавлена ​​процедура с инструкциями SIMD. Эта процедура запрашивает поддержку ЦП SSE. Он может принимать 4 целых числа в регистре XMM и вычитать их сразу. Также есть возможность использовать movdqa вместо movdqu это быстрее, но сначала нужно обеспечить выравнивание по 16 байтам. Вы также можете развернуть пар XMM, как в моем первом случае asm. (Меня интересует измерение скорости. :))

var
  array1, array2: array of integer;

procedure SubQIntArray(var ar1, ar2; length: integer);
asm
//prepare length if not rounded to 4
  push     ecx
  shr      length, 2
  jz       @LengthToSmall
@Loop:
  movdqu   xmm1, [ar1]          //or movdqa but ensure 16b aligment first
  movdqu   xmm2, [ar2]          //or movdqa but ensure 16b aligment first
  psubd    xmm1, xmm2
  movdqu   [ar1], xmm1          //or movdqa but ensure 16b aligment first
  add      ar1, 16
  add      ar2, 16
  dec      length
  jnz      @Loop
@LengthToSmall:
  pop      ecx
  push     ebx
  and      ecx, 3
  jz       @Exit
  mov      ebx, [ar2]
  sub      [ar1], ebx
  dec      ecx
  jz       @Exit
  mov      ebx, [ar2 + 4]
  sub      [ar1 + 4], ebx
  dec      ecx
  jz       @Exit
  mov      ebx, [ar2 + 8]
  sub      [ar1 + 8], ebx
@Exit:
  pop      ebx
end;

begin
//Fill arrays first!
  SubQIntArray(Array1[0], Array2[0], length(Array1));
person GJ.    schedule 15.02.2011
comment
это выглядит очень интересно, я не слишком знаком с asm. Каков тип параметра ar1 и ar2 вашей процедуры? - person Michael Küller; 16.02.2011
comment
Тип ar1 и ar2 — любой указатель на переменную. В нашем случае это адрес к первому элементу массива или Array1[0]. И да, тип Array1 (и Array2) может быть динамическим или статическим массивом! - person GJ.; 16.02.2011
comment
Добавлена ​​процедура с SIMD-инструкциями! - person GJ.; 16.02.2011
comment
Я бы дал еще +1, если это возможно. - person arthurprs; 16.02.2011
comment
@GJ. Как насчет 64 бит? :) Очень пожалуйста. - person user3764855; 04.07.2014
comment
@GJ. Не очень хорошо разбираюсь в ASM. Может быть, если бы это было 3 строки ASM. У этого есть SIMD, так что нет. - person user3764855; 04.07.2014
comment
@GJ. Какие инструкции нужно преобразовать, чтобы можно было работать на 64 bit ? - person user3764855; 09.07.2014
comment
@ user3764855: инструкции те же, только регистры 64-битные. Не забывайте, что и входные регистры как параметры функции не совпадают. - person GJ.; 09.07.2014

Я не эксперт по сборке, но я думаю, что следующее почти оптимально, если вы не принимаете во внимание SIMD-инструкции или параллельную обработку, последнюю можно легко выполнить, передав части массива функции.

like
Thread1: SubArray(ar1[0], ar2[0], 50);
Thread2: SubArray(ar1[50], ar2[50], 50);

procedure SubArray(var Array1, Array2; const Length: Integer);
var
  ap1, ap2 : PInteger;
  i : Integer;
begin
  ap1 := @Array1;
  ap2 := @Array2;
  i := Length;
  while i > 0 do
  begin
    ap1^ := ap1^ - ap2^;
    Inc(ap1);
    Inc(ap2);
    Dec(i);
  end;
end;

// similar assembly version
procedure SubArrayEx(var Array1, Array2; const Length: Integer);
asm
  // eax = @Array1
  // edx = @Array2
  // ecx = Length
  // esi = temp register for array2^
  push esi
  cmp ecx, 0
  jle @Exit
  @Loop:
  mov esi, [edx]
  sub [eax], esi
  add eax, 4
  add edx, 4
  dec ecx
  jnz @Loop
  @Exit:
  pop esi
end;


procedure Test();
var
  a1, a2 : array of Integer;
  i : Integer;
begin
  SetLength(a1, 3);
  a1[0] := 3;
  a1[1] := 1;
  a1[2] := 2;
  SetLength(a2, 3);
  a2[0] := 4;
  a2[1] := 21;
  a2[2] := 2;
  SubArray(a1[0], a2[0], Length(a1));

  for i := 0 to Length(a1) - 1 do
    Writeln(a1[i]);

  Readln;
end;
person arthurprs    schedule 16.02.2011
comment
Небольшая оптимизация... Вам не нужен cmp ecx, 0, потому что dec ecx должен установить нулевой флаг! - person GJ.; 16.02.2011

Это не настоящий ответ на ваш вопрос, но я хотел бы выяснить, могу ли я уже выполнять вычитание в какой-то момент во время заполнения массивов значениями. При желании я бы даже рассмотрел третий массив в памяти для хранения результата вычитания. В современных вычислениях «стоимость» памяти значительно ниже «стоимости» времени, необходимого для выполнения дополнительного действия с памятью.

Теоретически вы выиграете хотя бы немного в производительности, когда вычитание может быть выполнено, пока значения все еще находятся в регистрах или кэше процессора, но на практике вы просто можете наткнуться на несколько приемов, которые могут повысить производительность всего алгоритма.

person Stijn Sanders    schedule 16.02.2011