Fluentd Публикация в stdout, но не в Elasticsearch

Используя Docker, я настроил три контейнера: один для Elasticsearch, один для Fluent и один для Kibana. Elasticsearch работает на порту 9200, fluentd — на 24224, а Kibana — на 5600. Мой конфигурационный файл fluentd:

# INJECTED VIA DOCKER COMPOSE
<source>
  @type forward
  port 24224
  format json
</source>

#<filter **>
#  @type stdout
#</filter>

#<filter **>
 # @type parser
 # format json
 # key_name log
 # hash_value_field log
 # reserve_data true
#</filter>

<match **>
  @type copy
  <store>
    @type elasticsearch
    hosts 172.18.0.1:9200
    logstash_format true
    logstash_prefix chris.risley
    logstash_dateformat %Y%m%d
    include_tag_key true
    flush_interval 1s
  </store>
  <store>
    @type stdout
  </store>
</match>

Используя эту библиотеку fluent-logger https://github.com/fluent/fluent-logger-python, я добавил обработчик fluentd к встроенной функции ведения журнала Python и смог успешно использовать регистратор для отправки информации в fluentd. После того, как он был отправлен в fluentd, он должен быть опубликован в эластичном поиске, а также отображаться в терминале. В терминале вроде все работает и я вижу следующее:

fluentd_1         | 2018-01-17 14:22:15.000000000 +0000 test-logger: "{\"json\":\"message\", \"log\":\"work dammit\"}"

Однако при проверке Kibana данные в Elasticsearch не отображаются. Как вы можете видеть выше в конфигурации fluentd, я пытался использовать фильтр, так как думал, что это может быть какая-то проблема с форматированием, и я обычно получаю следующее сообщение об ошибке, когда возюсь с конфигурацией:

2018-01-16 16:50:13.641919751 +0000 fluent.warn: {"retry_time":2,"next_retry_seconds":"2018-01-16 16:50:13 +0000","chunk":"562e785fe25f67fcd8919dfd02992af2","error":"#<Fluent::ElasticsearchOutput::ConnectionFailure: Can not reach Elasticsearch cluster ({:host=>\"docker.for.mac.localhost\", :port=>9200, :scheme=>\"http\"})!>","message":"failed to flush the buffer. retry_time=2 next_retry_seconds=2018-01-16 16:50:13 +0000 chunk=\"562e785fe25f67fcd8919dfd02992af2\" error_class=Fluent::ElasticsearchOutput::ConnectionFailure error=\"Can not reach Elasticsearch cluster ({:host=>\\"docker.for.mac.localhost\\", :port=>9200, :scheme=>\\"http\\"})!\""}'

2018-01-17 14:59:03 +0000 [error]: #0 Could not push log to Elasticsearch: {"took"=>0, "errors"=>true, "items"=>[{"index"=>{"_index"=>"chris.risley-20180117", "_type"=>"fluentd", "_id"=>"AWEEoW6ECtI_aQclhe0C", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [log]", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:439"}}}}]}

Есть идеи, что происходит? Я действительно хотел бы, чтобы было больше документации по проблемам.

И в случае, если это проблема, вот как я использую библиотеку журналирования fluentd в Python:

from fluent import handler
from src.directlog import DirectLog
import logging.config
import yaml
import datetime
import msgpack
from io import BytesIO


class ElasticLogger(object):

def __init__(self, tag, host, port, base_level, config=False, path='src/logging.yaml'):
    """
    Constructs and initializes a logging facility that has a fluentd handler
    :param tag: The logger's tag or name
    :param host: The host url/ip of fluentd
    :param port: The port of fluentd
    :param base_level: The base level logging priority
    :param config: Boolean to determine whether or not you configure with a dictionary
    :param path: Path to dictionary
    """
    self.tag = tag
    self.host = host
    self.port = port
    self.base_level = base_level
    self.logger = self.config(config, path)
    # TODO: Abstract Out Authentication
    self.back_up_logger = DirectLog(self.host, self.tag + str(datetime.datetime.now().strftime("%Y%M%d")),
                                    '#####', '#####', 10000, self.base_level, False)

# Public
def build(self)->logging:
    """
    Builds the logger and returns it for use
    :return: The logger
    """
    return self.logger

# Private
def config(self, dict_config=False, path='src/logging.yaml')->logging:
    """
    Configures the fluentd logger with the fields initialized in the constructor
    :param dict_config: Boolean to determine whether or not to configure it with a dictionary
    :param path: The path to the dictionary
    :return: The logger
    """
    logger = logging.getLogger(self.tag)
    if dict_config:
        with open(path) as fd:
            conf = yaml.load(fd)
        logging.config.dictConfig(conf['logging'])
    else:
        logging.basicConfig(level=self.base_level)
    h = handler.FluentHandler(self.tag, host=self.host, port=self.port, buffer_overflow_handler=self.overflow_handler)
    logger.addHandler(h)
    return logger

# Private
def overflow_handler(self, pendings):
    """
    Used to save data that overflowed the buffer
    :param pendings: The pending data?
    :return: Nothing
    """
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)
        # Uncomment to have data that overflowed the buffer be posted directly to elasticsearch
        # self.back_up_logger.log(unpacked[2])


if __name__ == '__main__':
    local_host = '####'
    el = ElasticLogger('test-logger', local_host, 24224, logging.NOTSET, False).build()
    el.error('{"json":"message", "log":"work dammit"}')

person Chris Risley    schedule 17.01.2018    source источник


Ответы (1)


Изменение: хосты 172.18.0.1:9200 с хостом 172.18.0.1 порт 9200

person bitbull    schedule 17.05.2019