LMAX Disruptor — от чего зависит размер партии?

Недавно я узнал о LMAX Disruptor и провел несколько экспериментов. Одна вещь, которая меня озадачивает, это параметр endOfBatch метода onEvent обработчика EventHandler. Рассмотрим мой следующий код. Во-первых, фиктивное сообщение и потребительские классы, которые я называю Test1 и Test1Worker:

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}

Обратите внимание, что я установил задержку в 500 миллисекунд просто как замену некоторой реальной работе. Я также печатаю в консоли порядковый номер

И затем мой класс драйвера (который действует как производитель) вызвал DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

Здесь, после инициализации необходимых материалов, я загружаю 10 сообщений в RingBuffer (размер буфера 8) и пытаюсь отслеживать пару вещей — задержку производителя для запроса следующего слота в RingBuffer и сообщения с их порядковыми номерами. со стороны потребителя, а также о том, рассматривается ли конкретная последовательность как конец партии.

Теперь, что интересно, с задержкой в ​​​​500 мс, связанной с обработкой каждого сообщения, это то, что я получаю в качестве вывода:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

Однако, если я уберу время ожидания 500 мс, вот что я получу:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

Таким образом, похоже, что на то, считается ли определенное сообщение концом пакета (т. е. на размер пакета), влияет задержка обработки сообщения потребителем. Может я туплю, но так ли это должно быть? Что стоит за этим? Что вообще определяет размер партии? Заранее спасибо. Дайте мне знать, если что-то в моем вопросе неясно.


person Asif Iqbal    schedule 15.11.2015    source источник
comment
вам, вероятно, следует посмотреть несколько видеороликов Алексея Шипилева о бенчмаркинге на JVM, если вы хотите углубиться в детали - shipilev.net< /а>   -  person jasonk    schedule 16.11.2015


Ответы (1)


Размер партии определяется исключительно количеством доступных элементов. Таким образом, если в данный момент доступно больше элементов, то они будут включены в партию. Например, если Disruptor вызывает ваш код, а в очереди только один элемент, вы получите один вызов с endOfBatch=true. Если в очереди 8 элементов, то он соберет все 8 и отправит их одним пакетом.

Вы можете видеть в приведенном ниже коде, что извлекается количество записей, «доступных» в очереди, и это может быть намного больше, чем «следующий» элемент. Так, например, вас сейчас 5, вы ждете слот 6, а затем приходят 3 события, доступных будет 8, и вы получите несколько вызовов (для 6,7,8) в пакете.

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

Что касается паузы в 500 мс в элементе 9, обратите внимание, что Disruptor построен с кольцевым буфером, и вы указали количество слотов в буфере как 8 (см. Второй параметр здесь):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

Если не все потребители использовали элемент, а кольцевой буфер заполнен (все 8 элементов заполнены), производитель будет заблокирован от отправки новых событий в буфер. Вы можете попробовать увеличить размер буфера, скажем, на 2 миллиона объектов или убедиться, что ваш потребитель работает быстрее, чем производитель, чтобы очередь не заполнялась (удалите сон, который вы уже продемонстрировали).

person jasonk    schedule 15.11.2015
comment
Спасибо за Ваш ответ. Теперь у меня есть четкое представление о размере партии. Однако сейчас меня озадачивает время ожидания производителя. Обратите внимание, что в моем примере (с задержкой 500 мс) производитель ждал примерно 3500 мс (500 * 7), прежде чем заявить о слоте 9 (который является слотом 2 после модуля). Мне интересно, почему производитель просто не потребовал слот 9 сразу после того, как потребитель получил последовательность 2? Почему он ждал, пока последовательность 7 не будет использована? Я знаю, что этот вопрос немного не по теме исходного вопроса, но мне все еще любопытно. - person Asif Iqbal; 16.11.2015
comment
@AsifIqbal, вот где появляется WaitStrategies. По умолчанию (в вашем коде) есть BlockingWaitStrategy и он так себя ведет. Вы можете изменить WaitStrategy, чтобы изменить поведение. - person Denys; 05.10.2018