Logstash: чтение многострочных данных из необязательных строк

У меня есть файл журнала, который содержит строки, начинающиеся с метки времени. За каждой такой строкой с отметкой времени может следовать неопределенное количество дополнительных строк:

SOMETIMESTAMP some data
extra line 1 2
extra line 3 4

Дополнительные строки будут предоставлять дополнительную информацию для строки с отметкой времени. Я хочу извлечь 1, 2, 3 и 4 и сохранить их как переменные. Я могу разобрать лишние строки на переменные, если знаю, сколько их. Например, если я знаю, что есть две лишние строки, фильтр grok ниже будет работать. Но что мне делать, если я заранее не знаю, сколько дополнительных строк будет существовать? Есть ли способ проанализировать эти строки одну за другой перед применением многострочного фильтра? Это может помочь.

Кроме того, даже если я знаю, что у меня будет только 2 дополнительные строки, является ли фильтр ниже лучшим способом получить к ним доступ?

filter {
    multiline {
        pattern => "^%{SOMETIMESTAMP}"
        negate => "true"
        what => "previous"
    }

    if "multiline" in [tags] {
        grok {
            match => { "message" => "(?m)^%{SOMETIMESTAMP} %{DATA:firstline}(?<newline>[\r\n]+)%{DATA:secondline}(?<newline>[\r\n]+)%{DATA:thirdline}$" }
        }
    }
    # After this would be grok filters to process the contents of
    # 'firstline', 'secondline', and 'thirdline'. I would then remove
    # these three temporary fields from the final output.
}

(Я разделил строки на отдельные переменные, так как это позволяет мне выполнять дополнительное сопоставление с образцом для содержимого строк по отдельности, без необходимости снова обращаться ко всему образцу. Например, на основе содержимого первой строки я может захотеть представить поведение ветвления для других строк.)


person Pat Flegit    schedule 17.06.2015    source источник


Ответы (2)


Зачем тебе это нужно?

Собираетесь ли вы вставлять одно событие со всеми значениями или это действительно отдельные события, которые просто должны иметь одну и ту же отметку времени?

Если все они должны появиться в одном и том же событии, вам, возможно, придется прибегнуть к фильтру ruby, чтобы разделить лишние строки на поля события, над которыми вы сможете работать дальше.

Например:

if "multiline" in [tags] {
    grok {
        match => { "message" => "(?m)^%{SOMETIMESTAMP} %{DATA:firstline}(?<newline>[\r\n]+)" }
    }
    ruby {
       code => '
         event["lines"] = event["message"].scan(/[^\r\n]+[\r\n]*/);
       '
    }
}

Если это действительно отдельные события, вы можете использовать подключаемый модуль memorize для logstash 1.5 и более поздних версий.

person Alcanzar    schedule 17.06.2015
comment
отредактировано, чтобы добавить * вместо + --, чтобы новая строка в конце была необязательной - person Alcanzar; 17.06.2015
comment
Разделив строки на массив lines, есть ли способ применить фильтр grok к каждой строке в этом массиве? Таким образом, я могу получить 1, 2, 3 и 4 из входных данных. - person Pat Flegit; 17.06.2015
comment
Похоже, вы должны иметь возможность использовать %{[lines][1]} (см. groups.google.com/forum/#!topic/logstash-users/mlfMs-2iuVI). Также вы, вероятно, могли бы использовать сплит-фильтр вместо рубинового фильтра. - person Alcanzar; 17.06.2015
comment
Хорошая находка... тем не менее, есть ли способ применить это к произвольному количеству строк? Таким образом, я мог бы запустить фильтр grok для %{[lines][1]} и %{[lines][2]} и т. д. -- столько дополнительных строк, сколько у меня есть, поскольку это число произвольный. - person Pat Flegit; 17.06.2015
comment
У вас есть вся мощь рубина в рубиновом фильтре, поэтому вы можете делать практически все, что захотите. Но в языке конфигурации logstash нет циклической конструкции. - person Alcanzar; 17.06.2015
comment
Ах... так что лучший способ сделать это, вероятно, было бы написать Ruby-скрипт для выполнения простой работы с регулярными выражениями, а затем использовать event[] для создания новых переменных. (Вместо использования grok.) - person Pat Flegit; 17.06.2015
comment
Вот что я бы сделал. - person Alcanzar; 17.06.2015

Это изменилось по сравнению с версиями ссылок на поля событий ELK Direct (например, событие ['поле']) были отключены в пользу использования методов получения и установки события (например, event.get('поле')).

filter {
    grok {
        match => { "message" => "%{TIMESTAMP_ISO8601:logtime} %{LOGLEVEL:level}%{DATA:firstline}" }
    }
    ruby { code => "event.set('message', event.get('message').scan(/[^\r\n]+[\r\n]*/))" }
}
person IMPRENABLE AUTOMATION    schedule 11.08.2018