Способ отправки половины журнала в RabbitMQ перед отправкой полного журнала в logstash/elasticsearch

У меня есть пара функций, и каждая функция создает журналы, относящиеся к одной транзакции; Это многопоточное приложение, поэтому запись функции в func1 может быть случайной для выполненной транзакции, но для одиночной транзакции она будет проходить только через порядок func1, func2 и func3.

func1(transactionId) {
     log("%d Now in func1", transactionId);
}

func2(transactionId) {
     log("%d Now in func2", transactionId);
}

func3(transactionId) {
     log("%d Now in func3", transactionId);
}

Теперь я хочу записывать в logstash сразу для каждой транзакции ТОЛЬКО за раз; то есть

 1 Now in func1 Now in func2 Now in fun3

а затем нужно перейти, наконец, к elasticsearch;

Я думал о том, чтобы записать половину журнала транзакций во временную очередь RabbitMQ, а затем, по завершении полной транзакции, я передам его в очередь производителя RabbitMQ, чтобы отправить сообщение в logstash;

Нравиться

func1(transactionId) {
     add2RMQ(transactionId, "Now in func1");
}

func2(transactionId) {
     add2RMQ("transactionId, "Now in func2");
}

func3(transactionId) {
      add2RMQ("transactionId, "Now in func3");
      /* Last point of transaction */
      commit2RMQ(transactionId);
}

Во время выполнения commit2RMQ logstash должен получить полное сообщение, относящееся к транзакции, для записи в elasticsearch.

Вопрос:

  1. Как правильно решить эту проблему, чтобы сразу отправлять данные, относящиеся к транзакции, в elasticsearch?
  2. Можем ли мы решить эту проблему с помощью RabbitMQ? Если да, то какой правильный API мне нужно использовать для этого?
  3. Есть ли способ добиться того же без RabbitMQ, но только с logstash и elasticsearch?
  4. Я не хочу использовать API обновления elasticsearch, поскольку он может потребовать много операций поиска для каждого сообщения журнала, относящегося к транзакции.

person Viswesn    schedule 27.09.2015    source источник


Ответы (1)


Попытка агрегировать разные строки журнала, относящиеся к одной транзакции, — непростая задача, особенно если вы добавите к смеси систему очередей сообщений в качестве промежуточного хранилища журналов для агрегирования. Я бы пошел другим путем, который не включает другую подсистему, такую ​​​​как RabbitMQ.

Кроме того, если вы попытаетесь объединить несколько строк журнала в одну, вы потеряете детализированную информацию, которую может предоставить каждая строка журнала, например, количество времени, которое потребовалось для выполнения каждой функции. Кроме того, что произойдет, если func2, соответственно func3, выдаст исключение? Следует ли хранить неполный журнал, состоящий только из func1, соответственно только из func1 и func2?

То, что я собираюсь написать, вероятно, можно перенести на любой язык и любое решение для ведения журналов, но для иллюстрации я предполагаю, что ваша программа написана на Java и вы используете Log4J.

Поэтому я бы использовал сопоставленный диагностический контекст Log4J ( MDC), чтобы сохранить идентификатор вашей транзакции (и, возможно, другие данные, такие как имя пользователя и т. д.) в каждой из строк журнала. Таким образом, вы можете легко получить все строки журнала, относящиеся к одной транзакции. Преимущество этого в том, что вам не нужно ничего агрегировать, вы просто предоставляете достаточно контекстной информации, чтобы Kibana могла сделать это за вас позже.

В своем псевдокоде вы добавляете идентификатор транзакции непосредственно в свое сообщение. Преимущество использования для этого MDC вместо регистрации идентификатора в вашем сообщении заключается в том, что он избавляет вас от синтаксического анализа всех ваших сообщений в Logstash для повторного обнаружения идентификатора транзакции, который вы уже знали при создании строки журнала.

Итак, идея состоит в том, что в вашем коде, как только у вас есть идентификатор транзакции, вы добавляете его в текущий контекст ведения журнала для каждого потока следующим образом:

import org.apache.log4j.MDC;

...
func1(transactionId) {
     // add the transaction ID to the logging context
     MDC.put("transactionID", transactionId);
     log("Now in func1");
}

func2(transactionId) {
     log("Now in func2");
}

func3(transactionId) {
     log("Now in func3");
}

Затем в вашем файле конфигурации Log4J вы можете указать добавление, используя шаблон %X{transactionID}, чтобы сохранить его, в этом случае я добавляю его сразу после имени потока, но вы можете поместить его где угодно:

log4j.appender.consoleAppender.layout.ConversionPattern = %d [%t] [%X{transactionID}] %5p %c - %m%n

Ваши журналы будут выглядеть примерно так:

2015-09-28T05:07:28.425Z [http-8084-2] [625562271762]  INFO YourClass - Now in func1
2015-09-28T05:07:29.776Z [http-8084-2] [625562271762]  INFO YourClass - Now in func2
2015-09-28T05:07:30.652Z [http-8084-2] [625562271762]  INFO YourClass - Now in func3
                                              ^
                                              |
                                  the transaction ID is here

Когда у вас есть такие строки журнала, проще всего получить идентификатор транзакции с помощью фильтра Logstash grok и сохранить его в собственном поле transactionID в вашем индексе logstash. В Kibana вы можете затем искать идентификатор транзакции и сортировать по отметке времени desc, и у вас будет отображаться весь контекст этой транзакции.

Дать ему шанс!

person Val    schedule 28.09.2015
comment
Большое спасибо; Я действительно ценю то, как вы решили проблему с точки зрения предоставления мне решения, характерного для случаев сбоя. Я проведу этот эксперимент. - person Viswesn; 02.10.2015