grokparsefailure с несколькими if [type] — конфигурация logstash

Хорошо - я ломал голову над этим конфигурационным файлом в течение нескольких дней без особого успеха (я очень новичок в стеке logstash/ELK). Проблема, с которой я сталкиваюсь, заключается в том, что когда я помещаю две конфигурации logstash в один и тот же каталог, я получаю ошибку grok во второй конфигурации. Это означает, что 001 будет работать, а 002 выдаст ошибку. Если я запускаю logstash только с одной конфигурацией (неважно, какой), все работает отлично. В сочетании одно работает, другое не работает. Я объединил два файла conf в один файл conf, но та же проблема не устранена. Ниже представлена ​​объединенная версия конфигурации и образец системных журналов. Будем очень благодарны любой помощи!

input {
  file {
    path => ["/var/log/pantraffic.log"]
    #start_position => "beginning"
    type => "pantraffic"
  }
   file {
    path => ["/var/log/panthreat.log"]
    #start_position => "beginning"
    type => "panthreat"
  } 
}

filter {
  if [type] == "pantraffic" {
    grok {
      #patterns_dir => "/opt/logstash/patterns"
      match => [ "message_traffic", "%{TIMESTAMP_ISO8601:@timestamp} %       {HOSTNAME:syslog_host} %{GREEDYDATA:traffic_message}"]
    }
    syslog_pri { }
  }
   csv {
      source => "traffic_message"
columns => [ "PaloAltoDomain","ReceiveTime","SerialNum","Type","Threat-   ContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPProtocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","pkts_sent","pkts_received","sessionEndReason" ]
}

 date {
      #timezone => "America/Chicago"
      match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]    
}

mutate {
      convert => [ "Bytes", "integer" ]
      convert => [ "BytesReceived", "integer" ]
      convert => [ "BytesSent", "integer" ]
      convert => [ "ElapsedTimeInSec", "integer" ]
      convert => [ "geoip.area_code", "integer" ]
      convert => [ "geoip.dma_code", "integer" ]
      convert => [ "geoip.latitude", "float" ]
      convert => [ "geoip.longitude", "float" ]
      convert => [ "NATDestinationPort", "integer" ]
      convert => [ "NATSourcePort", "integer" ]
      convert => [ "Packets", "integer" ]
      convert => [ "pkts_received", "integer" ]
      convert => [ "pkts_sent", "integer" ]
      convert => [ "seqno", "integer" ]
      gsub => [ "Rule", " ", "_",
                "Application", "( |-)", "_" ]
      remove_field => [ "message_traffic", "traffic_message" ]
    }

if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-   9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "SourceAddress"
           target => "SourceGeo"
      }
      if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
        mutate {
          replace => [ "SourceGeo.location", "" ]
        }
      }
    }
if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "DestinationAddress"
           target => "DestinationGeo"
      }

      if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
        mutate {
          replace => [ "DestinationAddress.location", "" ]
        }
      }
    }

  if [SourceAddress] and [DestinationAddress] {
    fingerprint {
      concatenate_sources => true
      method => "SHA1"
      key => "logstash"
      source => [ "SourceAddress", "SourcePort", "DestinationAddress",  "DestinationPort", "IPProtocol" ]
    }
  }
###########################################################################
if [type] == "panthreat" {
    grok {

      match => [ "message", "%{TIMESTAMP_ISO8601:@timestamp} % {HOSTNAME:syslog_host} %{GREEDYDATA:threat_message}"]
    }
        syslog_pri { }
  }
   csv {
      source => "threat_message"
columns => [ "Domain","ReceiveTime","Serial","Type","ThreatContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPProtocol","Action","URL","ThreatContentName","Category","Severity","Direction","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","contenttype","pcap_id","filedigest","cloud","url_idx","user_agent","filetype","xff","referer","sender","subject","recipient","reportid" ]
}

date {
      #timezone => "America/Chicago"
      match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
}

mutate {
      #convert => [ "Bytes", "integer" ]
      #convert => [ "BytesReceived", "integer" ]
      #convert => [ "BytesSent", "integer" ]
      #convert => [ "ElapsedTimeInSec", "integer" ]
      convert => [ "geoip.area_code", "integer" ]
      convert => [ "geoip.dma_code", "integer" ]
      convert => [ "geoip.latitude", "float" ]
      convert => [ "geoip.longitude", "float" ]
      convert => [ "NATDestinationPort", "integer" ]
      convert => [ "NATSourcePort", "integer" ]
      #convert => [ "Packets", "integer" ]
      #convert => [ "pkts_received", "integer" ]
      #convert => [ "pkts_sent", "integer" ]
      #convert => [ "seqno", "integer" ]
      gsub => [ "Rule", " ", "_",
                "Application", "( |-)", "_" ]
      remove_field => [ "message", "threat_message" ]
    }

if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6- 9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "SourceAddress"
           target => "SourceGeo"
      }
      if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
        mutate {
          replace => [ "SourceGeo.location", "" ]
        }
      }
    }
 if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "DestinationAddress"
           target => "DestinationGeo"
      }

      if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
        mutate {
          replace => [ "DestinationAddress.location", "" ]
         }
       }
    }

  if [SourceAddress] and [DestinationAddress] {
    fingerprint {
      concatenate_sources => true
      method => "SHA1"
      key => "logstash"
      source => [ "SourceAddress", "SourcePort", "DestinationAddress",  "DestinationPort", "IPProtocol" ]
    }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

Образцы журналов:

panthreat log:
015-11-13T04:53:28-06:00 PA-200 1,2015/11/13    04:53:28,0011122223333,THREAT,vulnerability,1,2015/11/13 04:53:28,73.222.111.1,4.4.4.4,0.0.0.0,0.0.0.0,rule1,,,dns,vsys1,trust,untrust,ethernet1/2,ethernet1/1,Default_Forwarder,2015/11/13 04:53:28,3602,1,34830,53,0,0,0x0,udp,drop-all-packets,"",Test(41000),0,any,high,client-to-server,37,0x0,US,US,0,,0,,,0,,,,,,,

pantraffic log:
2015-11-13T07:34:22-06:00 PA-200 1,2015/11/13 07:34:21,001112223334,TRAFFIC,end,1,2015/11/13 07:34:21,73.22.111.1,4.3.2.1,0.0.0.0,0.0.0.0,rule1,,,facebook-base,vsys1,trust,untrust,ethernet1/2,ethernet1/1,Default_Forwarder,2015/11/13 07:34:21,6385,1,63121,443,0,0,0x53,tcp,allow,6063,2285,3778,29,2015/11/13 07:34:05,2,social-networking,0,15951,0x0,US,IE,0,17,12,tcp-fin

person whistler585    schedule 13.11.2015    source источник


Ответы (1)


Я думаю, вы перепутали закрывающие скобки. Проверьте этот блок (ваш первый if), например:

  if [type] == "pantraffic" {
    grok {
      #patterns_dir => "/opt/logstash/patterns"
      match => [ "message_traffic", "%{TIMESTAMP_ISO8601:@timestamp} %       {HOSTNAME:syslog_host} %{GREEDYDATA:traffic_message}"]
    }
    syslog_pri { }
  }

Последняя закрывающая скобка, скорее всего, здесь неверна. Вы не хотите закрывать свой блок if здесь, но непосредственно перед тем, как вы начнете свой блок «panthreat» ниже. Та же проблема с блоком "panthreat" if.

person markus    schedule 19.11.2015
comment
@ whistler585 привет, пожалуйста, примите мой ответ, если он был вам полезен. спасибо - person markus; 12.03.2016