Недавно я узнал о LMAX Disruptor и провел несколько экспериментов. Одна вещь, которая меня озадачивает, это параметр endOfBatch
метода onEvent
обработчика EventHandler
. Рассмотрим мой следующий код. Во-первых, фиктивное сообщение и потребительские классы, которые я называю Test1
и Test1Worker
:
public class Test1 {
}
public class Test1Worker implements EventHandler<Test1>{
public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
try{
Thread.sleep(500);
}
catch(Exception e){
e.printStackTrace();
}
System.out.println("Received message with sequence " + sequence + ". "
+ "EndOfBatch = " + endOfBatch);
}
}
Обратите внимание, что я установил задержку в 500 миллисекунд просто как замену некоторой реальной работе. Я также печатаю в консоли порядковый номер
И затем мой класс драйвера (который действует как производитель) вызвал DisruptorTest
:
public class DisruptorTest {
private static Disruptor<Test1> bus1;
private static ExecutorService test1Workers;
public static void main(String[] args){
test1Workers = Executors.newFixedThreadPool(1);
bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);
bus1.handleEventsWith(new Test1Worker());
RingBuffer<Test1> buf1 = bus1.start();
for (int i = 0; i < 10; i++){
long a = System.currentTimeMillis();
long next = buf1.next();
long b = System.currentTimeMillis();
System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
try {
Test1 message = buf1.get(next);
} catch (Exception e) {
e.printStackTrace();
} finally {
buf1.publish(next);
}
}
}
public static class Test1Factory implements EventFactory<Test1> {
public Test1 newInstance() {
return new Test1();
}
}
}
Здесь, после инициализации необходимых материалов, я загружаю 10 сообщений в RingBuffer
(размер буфера 8) и пытаюсь отслеживать пару вещей — задержку производителя для запроса следующего слота в RingBuffer
и сообщения с их порядковыми номерами. со стороны потребителя, а также о том, рассматривается ли конкретная последовательность как конец партии.
Теперь, что интересно, с задержкой в 500 мс, связанной с обработкой каждого сообщения, это то, что я получаю в качестве вывода:
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true
Однако, если я уберу время ожидания 500 мс, вот что я получу:
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true
Таким образом, похоже, что на то, считается ли определенное сообщение концом пакета (т. е. на размер пакета), влияет задержка обработки сообщения потребителем. Может я туплю, но так ли это должно быть? Что стоит за этим? Что вообще определяет размер партии? Заранее спасибо. Дайте мне знать, если что-то в моем вопросе неясно.