Мой код с Disruptor-Net работает медленнее, чем BlockingCollection

Disruptor должен быть намного быстрее, чем BlockingCollection.

В моем предыдущем вопросе Почему мой пример разрушителя такой медленный? Я написал два теста. Disruptor потратил около 1 микросекунды (или меньше), в то время как BlockingCollection потратил около 14 микросекунд.

Итак, я решил использовать Disruptor в своей программе, но когда я его реализовал, я обнаружил, что теперь Disruptor тратит около 50 микросекунд, в то время как BlockingCollection по-прежнему тратит 14-18 микросекунд.

Я изменил свой производственный код, сделав его «автономным тестом», и Disruptor по-прежнему тратит 50 микросекунд. Почему?

Ниже приведен упрощенный тест. В этом тесте у меня есть два варианта. Первый вариант — Sleep for 1 ms. Затем Disruptor тратит 30-50 микросекунд на доставку. Второй вариант — имитировать деятельность. Затем Disruptor тратит 7 микросекунд на доставку. Тот же тест с BlockingCollection дает 14-18 микросекунд. Так почему же Disruptor не быстрее, чем BlockingCollection?

В моем реальном приложении Disruptor тратит 50 микросекунд, чтобы доставить то, что слишком много! Я ожидаю, что он должен доставлять сообщения намного быстрее, чем 1 микросекунда.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class DisruptorTest
    {

        public class MyHandler : IEventHandler<ValueEntry>
        {
            private DisruptorTest _parent;

            public MyHandler(DisruptorTest parent)
            {
                this._parent = parent;
            }

            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                _parent.sw.Stop();
                long microseconds = _parent.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));

                // Filter out abnormal delays > 1000
                if (microseconds < 1000)
                {
                    _parent.sum += (int)microseconds;
                    _parent.count++;
                    if (_parent.count % 1000 == 0)
                    {
                        Console.WriteLine("average disruptor delay (microseconds) = {0}", _parent.sum / _parent.count);
                    }
                }
            }
        }

        private RingBuffer<ValueEntry> _ringBuffer;
        private const int RingSize = 64;

        static void Main(string[] args)
        {
            new DisruptorTest().Run();
        }

        public void Run()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
            disruptor.HandleEventsWith(new MyHandler(this));

            _ringBuffer = disruptor.Start();

            for (int i = 0; i < 10001; i++)
            {
                Do();

                // We need to simulate activity to allow event to deliver

                // Option1. just Sleep. Result 30-50 microseconds.
                Thread.Sleep(1);

                // Option2. Do something. Result ~7 microseconds.
                //factorial = 1;
                //for (int j = 1; j < 100000; j++)
                //{
                //    factorial *= j;
                //}
            }
        }

        public static int factorial;

        private Stopwatch sw = Stopwatch.StartNew();
        private int sum;
        private int count;

        public void Do()
        {
            long sequenceNo = _ringBuffer.Next();
            _ringBuffer[sequenceNo].Id = 0;
            sw.Restart();
            _ringBuffer.Publish(sequenceNo);
        }

    }
}

СТАРЫЙ код. Теперь следует игнорировать:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class DisruptorTest
    {

        public class MyHandler : IEventHandler<ValueEntry>
        {
            private readonly int _ordinal;
            private readonly int _consumers;
            private DisruptorTest _parent;

            public MyHandler(int ordinal, int consumers, DisruptorTest parent)
            {
                _ordinal = ordinal;
                _consumers = consumers;
                this._parent = parent;
            }

            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                if ((sequence % _consumers) == _ordinal)
                {
                    var id = data.Id;
                    _parent.sw[id].Stop();
                    long microseconds = _parent.sw[id].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
                    // filter out abnormal delays > 1000
                    if (microseconds < 1000)
                    {
                        _parent.sum[id] += (int)microseconds;
                        _parent.count[id]++;
                        if (_parent.count[id] % 10 == 0)
                        {
                            Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
                                id, _parent.sum[id] / _parent.count[id]);
                        }
                    }
                }
            }
        }

        private const int NumberOfThreads = 1;
        private RingBuffer<ValueEntry> _ringBuffer;
        private const int RingSize = 64;

        static void Main(string[] args)
        {
            new DisruptorTest().Run();
        }

        public void Run()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
            for (int i = 0; i < NumberOfThreads; i++)
                disruptor.HandleEventsWith(new MyHandler(i, NumberOfThreads, this));

            for (int i = 0; i < sw.Length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            _ringBuffer = disruptor.Start();

            //var rnd = new Random();
            for (int i = 0; i < 1000; i++)
            {
                //Do(rnd.Next(MaxId));
                Do(i % MaxId);
                Thread.Sleep(1);
            }
        }

        private const int MaxId = 100;

        private Stopwatch[] sw = new Stopwatch[MaxId];
        private int[] sum = new int[MaxId];
        private int[] count = new int[MaxId];

        public void Do(int id)
        {
            long sequenceNo = _ringBuffer.Next();
            _ringBuffer[sequenceNo].Id = id;
            sw[id].Restart();
            _ringBuffer.Publish(sequenceNo);
        }

    }
}

Выход:

......
Id = 91 average disruptor delay (microseconds) = 50
Id = 92 average disruptor delay (microseconds) = 48
Id = 93 average disruptor delay (microseconds) = 35
Id = 94 average disruptor delay (microseconds) = 35
Id = 95 average disruptor delay (microseconds) = 51
Id = 96 average disruptor delay (microseconds) = 55
Id = 97 average disruptor delay (microseconds) = 38
Id = 98 average disruptor delay (microseconds) = 37
Id = 99 average disruptor delay (microseconds) = 45

person Oleg Vazhnev    schedule 15.11.2012    source источник
comment
Пожалуйста, объясните, что, по вашему мнению, вы делаете в своем тесте. Очень трудно следить за вашим тестом.   -  person Kiril    schedule 15.11.2012
comment
я просто публикую числа от 1 до MaxId для потребителей NumberOfThreads. Я измеряю среднюю задержку одного элемента для каждого идентификатора (для этого я использую массивы). Я думаю, что тест можно упростить, я попробую это.   -  person Oleg Vazhnev    schedule 16.11.2012


Ответы (2)


Вы по-прежнему делаете то же самое: измеряете, сколько времени уходит на публикацию одного элемента.

public void Do(int id)
{
    long sequenceNo = _ringBuffer.Next();
    _ringBuffer[sequenceNo].Id = id;
    sw[id].Restart(); // <--- You're doing this EVERY TIME YOU PUBLISH an item!
    _ringBuffer.Publish(sequenceNo);
}

В предыдущем вопросе вам сообщили, что вы должны измерять ТЫСЯЧИ публикаций, чтобы правильно использовать точность Stopwatch.

Кроме того, вы все еще пишете в консоль посреди теста. Избегайте этого:

if (_parent.count[id] % 10 == 0)
{
    Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
        id, _parent.sum[id] / _parent.count[id]);
}

Очистите свой код

По крайней мере, вы должны попытаться немного почистить свой код; Я немного реорганизовал его, чтобы он не был таким беспорядочным: http://pastie.org/5382971.

Disrpputers не так просты для начала, теперь нам нужно разобраться с вашим кодом И попытаться рассказать вам, как это исправить. Что еще более важно: вы не можете оптимизировать производительность или тестировать, когда у вас есть спагетти-код. Старайтесь, чтобы все было просто и чисто. На данном этапе ваш код не является ни простым, ни чистым.

Давайте начнем с ужасающих соглашений об именах для ваших частных переменных-членов:

private const int NumberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
private const int MaxId = 100
private Stopwatch[] sw = new Stopwatch[MaxId];
private int[] sum = new int[MaxId];
private int[] count = new int[MaxId];

Быть последовательным:

private const int _numberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int _ringSize = 64;
private const int _maxId = 100
private Stopwatch[] _sw = new Stopwatch[MaxId];
private int[] _sum = new int[MaxId];
private int[] _count = new int[MaxId];

Некоторые другие указатели:

  • Избавьтесь от вложенных классов.
  • Переместите main в отдельный класс (например, Program).

Создайте хороший тест

Одна из первых вещей, которые Мартин и Майкл говорят вам, это то, что тестирование производительности также должно быть очень хорошим, поэтому они потратили довольно много времени на создание среда тестирования.

  • Я бы рекомендовал вам попробовать пару миллионов событий, а не 1000 событий.
  • Убедитесь, что вы используете только один таймер для ВСЕХ событий.
  • Запустите таймер, когда вы начнете обрабатывать элементы, и остановите его, когда больше не будет элементов для обработки.
  • Эффективный способ узнать, когда вы закончили обработку элементов, — использовать CountDownEvent.

Обновлять

Итак, давайте разберемся с первым спором: точность секундомера действительно должно быть достаточно.

Int64 frequency = Stopwatch.Frequency;
Console.WriteLine( "  Timer frequency in ticks per second = {0}", frequency );
Int64 nanosecPerTick = (1000L * 1000L * 1000L) / frequency;
Console.WriteLine( "  Timer is accurate within {0} nanoseconds", nanosecPerTick );

На моей машине разрешение находится в пределах 320 наносекунд. Таким образом, ОП прав, что разрешение на таймере не должно быть проблемой.

Я понимаю, что ОП хочет измерить среднюю доставку одного предмета, но есть (по крайней мере) два способа сделать это.

Мы должны исследовать разницу. На концептуальном уровне вы делаете то же самое, что и код ниже:

  1. Вы запускаете кучу итераций.
  2. Измерьте каждый из них.
  3. Вы подсчитываете итог.
  4. Вы вычисляете среднее значение в конце.

В коде:

Stopwatch sw = new Stopwatch();
long totalMicroseconds = 0;
int numItems = 1000;
for(int i = 0; i < numItems; i++)
{
    sw.Reset();
    sw.Start();
    OneItemDelivery();
    sw.Stop();
    totalMicroseconds += sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
}
long avgOneItemDelivery = totalMicroseconds/numItems;

Альтернативный способ измерения производительности заключается в следующем:

  1. Запустите таймер.
  2. Запустите все итерации.
  3. Остановите таймер.
  4. Вычислите среднее время.

В коде:

sw.Start();
for(int i = 0; i < numItems; i++)
{
    OneItemDelivery();    
}
sw.Stop();
totalMicroseconds = sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
long avgOneItemDelivery = totalMicroseconds/numItems;

У каждого могут быть свои проблемы:

  • Первый метод может быть менее точным, и вам нужно доказать на своей системе, что секундомер может точно справиться с этим небольшим объемом работы (помимо простого расчета с точностью до наносекунды).
  • Второй метод также будет включать время вычислений, необходимое для выполнения итерации. Это вносит небольшую погрешность в ваши измерения, но может решить проблемы с точностью, которые обычно возникают при использовании первого метода.

Вы уже заметили, что оператор Sleep снижает производительность, поэтому я бы рекомендовал вам выполнить простой расчет. Вычисление факториала кажется хорошей идеей, просто сделайте это очень маленьким вычислением: не нужно 100000, 100 тоже подойдет.

Конечно, вам не нужно ждать 2 минуты теста, но 10-20 секунд не должны быть проблемой.

person Kiril    schedule 15.11.2012
comment
я измеряю среднюю доставку одного предмета, потому что это то, что мне нужно измерить! и точности секундомера более чем достаточно! в моем реальном приложении я доставляю одно сообщение, а не миллионы сообщений! я пишу в консоль, когда Stopwatch уже остановлен, так что это ничего не меняет. Тест немного сложен, потому что я только что взял класс из своего реального приложения и изменил его. Так что этот тест довольно близок к тому, что у меня есть в реальной жизни. Я бы сказал, что это именно то приложение, которое мне нужно оптимизировать. - person Oleg Vazhnev; 16.11.2012
comment
изменение числа с 1000 на миллионы ничего не меняет, но в размещенном коде я использовал 1000, поэтому людям не нужно ждать 1-2 минуты, пока тест выполняется. Тот же тест, но с BlockingCollection быстрее в 2-3 раза. Почему в моем тесте BlockingCollection работает быстрее, чем Disruptor? - person Oleg Vazhnev; 16.11.2012
comment
относительно кодовых соглашений. Я назвал переменные, как предлагает Resharper. поэтому const должно быть Uppercased. - person Oleg Vazhnev; 16.11.2012
comment
@javapowered мне кажется, что вы действительно не понимаете тестирование производительности ... ваш способ измерения в основном не подходит и дает вам ненадежные / неправильные результаты ... пожалуйста, что указано в ответе ... - person Yahia; 16.11.2012
comment
@Yahia для меня, ты не понимаешь, что мне не нужно измерять тесты. Я измерил реальное приложение. И я обнаружил, что использование Disruptor реального приложения работает медленнее! И мой тест — это упрощенная версия моего реального приложения. Это абсолютно правильно, и я ожидаю, что Disruptor будет быстрее, чем BlockingCollection. - person Oleg Vazhnev; 16.11.2012
comment
@Yahia, вы можете переписать мой тест (теперь очень просто, пожалуйста, обратитесь к вопросу) :) - person Oleg Vazhnev; 16.11.2012
comment
+1 за полезный и хорошо сформулированный ответ. OP также должен отметить, что нет особых причин предполагать, что disruptor-net быстрее, чем параллельная очередь. Сам автор намекнул, что так оно и есть. - person avishayp; 16.11.2012
comment
@javapowered Я обновил свой ответ. Технически точности секундомера должно быть достаточно, но вам все равно нужно ее подтвердить (помимо простого вычисления точности в наносекундах). - person Kiril; 16.11.2012
comment
@Lirik большое спасибо за ваши усилия. вы предложили использовать CountdownEvent. обратите внимание, что создание и обработка событий, вероятно, является дорогостоящей операцией. выполнение этого для каждой итерации может значительно изменить результаты. Я думаю, что мой первоначальный тест более или менее верен, и проблема на самом деле в том, что я не использую Disruptor правильно. - person Oleg Vazhnev; 17.11.2012
comment
@javapowered CountdownEvent не вызывает событие, пока внутренний счетчик не достигнет 0. Счетчик уменьшается атомарно, поэтому стоимость практически незначительна. Ваше использование Disruptor может быть не оптимальным, но трудно сказать. - person Kiril; 17.11.2012

Я прочитал код BlockingCollecton, который вы написали из Почему мой пример разрушителя такой медленный?, вы добавляете много Console.WriteLine в Disruptor, но ни одного в BlockingCollection, Console.WriteLine медленный, у него есть блокировка внутри.

Ваш RingBufferSize слишком мал, это влияет на производительность, должно быть 1024 или больше.

и while (!dataItems.IsCompleted) может иметь некоторые проблемы, BlockCollection всегда находится в состоянии добавления, это приведет к преждевременному завершению потока.

Task.Factory.StartNew(() => {
    while (!dataItems.IsCompleted)
    {

        ValueEntry ve = null;
        try
        {
    ve = dataItems.Take();
    long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    results[ve.Value] = microseconds;

    //Console.WriteLine("elapsed microseconds = " + microseconds);
    //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
        }
        catch (InvalidOperationException) { }
    }
}, TaskCreationOptions.LongRunning);


for (int i = 0; i < length; i++)
{
    var valueToSet = i;

    ValueEntry entry = new ValueEntry();
    entry.Value = valueToSet;

    sw[i].Restart();
    dataItems.Add(entry);

    //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
    //Thread.Sleep(1000);
}

Я переписал вам код, Disruptor в 10 раз быстрее, чем BlockingCollection с несколькими производителями (10 параллельных продуктов), в 2 раза быстрее, чем BlockingCollection с одним производителем:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;

namespace DisruptorTest.Ds
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class MyHandler : IEventHandler<ValueEntry>
    {
        public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
        {
        }
    }

    [TestFixture]
    public class DisruptorPerformanceTest
    {
        private volatile bool collectionAddEnded;

        private int producerCount = 10;
        private int runCount = 1000000;
        private int RingBufferAndCapacitySize = 1024;

        [TestCase()]
        public async Task TestBoth()
        {
            for (int i = 0; i < 1; i++)
            {
                foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
                {
                    Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
                    RingBufferAndCapacitySize = rs;
                    await DisruptorTest();
                    await BlockingCollectionTest();
                }
            }
        }

        [TestCase()]
        public async Task BlockingCollectionTest()
        {
            var sw = new Stopwatch();
            BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);

            sw.Start();

            collectionAddEnded = false;

            // A simple blocking consumer with no cancellation.
            var task = Task.Factory.StartNew(() =>
            {
                while (!collectionAddEnded && !dataItems.IsCompleted)
                {
                    //if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
                    if (dataItems.TryTake(out var ve))
                    {
                    }
                }
            }, TaskCreationOptions.LongRunning);


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        ValueEntry entry = new ValueEntry();
                        entry.Id = i;
                        dataItems.Add(entry);
                    }
                });
            }

            await Task.WhenAll(tasks);

            collectionAddEnded = true;
            await task;

            sw.Stop();

            Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
        }


        [TestCase()]
        public async Task DisruptorTest()
        {
            var disruptor =
                new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
                    producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
            disruptor.HandleEventsWith(new MyHandler());

            var _ringBuffer = disruptor.Start();

            Stopwatch sw = Stopwatch.StartNew();

            sw.Start();


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        long sequenceNo = _ringBuffer.Next();
                        _ringBuffer[sequenceNo].Id = 0;
                        _ringBuffer.Publish(sequenceNo);
                    }
                });
            }


            await Task.WhenAll(tasks);


            disruptor.Shutdown();

            sw.Stop();
            Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
        }
    }
}

BlockingCollectionTest с общим экземпляром ValueEntry (без нового ValueEntry() в цикле for)

  • RingBufferAndCapacitySize: 64, productCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 16,962 с.

    Время тестирования BlockingCollection: 18.399

  • RingBufferAndCapacitySize: 512, productCount: 10, runCount: 1000000 из 0 DisruptorTest Time: 6,101 с

    Время тестирования BlockingCollection: 19,526

  • RingBufferAndCapacitySize: 1024, productCount: 10, runCount: 1000000 из 0

    РазрушительВремя тестирования: 2,928 с

    Время тестирования BlockingCollection: 20,25

  • RingBufferAndCapacitySize: 2048, productCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,448 с

    Время тестирования BlockingCollection: 20,649

BlockingCollectionTest создает новый ValueEntry() в цикле for

  • RingBufferAndCapacitySize: 64, productCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 27,374 с.

    Время тестирования BlockingCollection: 21,955

  • RingBufferAndCapacitySize: 512, productCount: 10, runCount: 1000000 из 0

    РазрушительВремя тестирования: 5,011 с

    Время тестирования BlockingCollection: 20.127

  • RingBufferAndCapacitySize: 1024, productCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,877 с

    Время тестирования BlockingCollection: 22,656

  • RingBufferAndCapacitySize: 2048, productCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,384 с

    Время тестирования BlockingCollection: 23,567

https://www.cnblogs.com/darklx/p/11755686.html

person shine    schedule 28.10.2019