Используя Docker, я настроил три контейнера: один для Elasticsearch, один для Fluent и один для Kibana. Elasticsearch работает на порту 9200, fluentd — на 24224, а Kibana — на 5600. Мой конфигурационный файл fluentd:
# INJECTED VIA DOCKER COMPOSE
<source>
@type forward
port 24224
format json
</source>
#<filter **>
# @type stdout
#</filter>
#<filter **>
# @type parser
# format json
# key_name log
# hash_value_field log
# reserve_data true
#</filter>
<match **>
@type copy
<store>
@type elasticsearch
hosts 172.18.0.1:9200
logstash_format true
logstash_prefix chris.risley
logstash_dateformat %Y%m%d
include_tag_key true
flush_interval 1s
</store>
<store>
@type stdout
</store>
</match>
Используя эту библиотеку fluent-logger https://github.com/fluent/fluent-logger-python, я добавил обработчик fluentd к встроенной функции ведения журнала Python и смог успешно использовать регистратор для отправки информации в fluentd. После того, как он был отправлен в fluentd, он должен быть опубликован в эластичном поиске, а также отображаться в терминале. В терминале вроде все работает и я вижу следующее:
fluentd_1 | 2018-01-17 14:22:15.000000000 +0000 test-logger: "{\"json\":\"message\", \"log\":\"work dammit\"}"
Однако при проверке Kibana данные в Elasticsearch не отображаются. Как вы можете видеть выше в конфигурации fluentd, я пытался использовать фильтр, так как думал, что это может быть какая-то проблема с форматированием, и я обычно получаю следующее сообщение об ошибке, когда возюсь с конфигурацией:
2018-01-16 16:50:13.641919751 +0000 fluent.warn: {"retry_time":2,"next_retry_seconds":"2018-01-16 16:50:13 +0000","chunk":"562e785fe25f67fcd8919dfd02992af2","error":"#<Fluent::ElasticsearchOutput::ConnectionFailure: Can not reach Elasticsearch cluster ({:host=>\"docker.for.mac.localhost\", :port=>9200, :scheme=>\"http\"})!>","message":"failed to flush the buffer. retry_time=2 next_retry_seconds=2018-01-16 16:50:13 +0000 chunk=\"562e785fe25f67fcd8919dfd02992af2\" error_class=Fluent::ElasticsearchOutput::ConnectionFailure error=\"Can not reach Elasticsearch cluster ({:host=>\\"docker.for.mac.localhost\\", :port=>9200, :scheme=>\\"http\\"})!\""}'
2018-01-17 14:59:03 +0000 [error]: #0 Could not push log to Elasticsearch: {"took"=>0, "errors"=>true, "items"=>[{"index"=>{"_index"=>"chris.risley-20180117", "_type"=>"fluentd", "_id"=>"AWEEoW6ECtI_aQclhe0C", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [log]", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:439"}}}}]}
Есть идеи, что происходит? Я действительно хотел бы, чтобы было больше документации по проблемам.
И в случае, если это проблема, вот как я использую библиотеку журналирования fluentd в Python:
from fluent import handler
from src.directlog import DirectLog
import logging.config
import yaml
import datetime
import msgpack
from io import BytesIO
class ElasticLogger(object):
def __init__(self, tag, host, port, base_level, config=False, path='src/logging.yaml'):
"""
Constructs and initializes a logging facility that has a fluentd handler
:param tag: The logger's tag or name
:param host: The host url/ip of fluentd
:param port: The port of fluentd
:param base_level: The base level logging priority
:param config: Boolean to determine whether or not you configure with a dictionary
:param path: Path to dictionary
"""
self.tag = tag
self.host = host
self.port = port
self.base_level = base_level
self.logger = self.config(config, path)
# TODO: Abstract Out Authentication
self.back_up_logger = DirectLog(self.host, self.tag + str(datetime.datetime.now().strftime("%Y%M%d")),
'#####', '#####', 10000, self.base_level, False)
# Public
def build(self)->logging:
"""
Builds the logger and returns it for use
:return: The logger
"""
return self.logger
# Private
def config(self, dict_config=False, path='src/logging.yaml')->logging:
"""
Configures the fluentd logger with the fields initialized in the constructor
:param dict_config: Boolean to determine whether or not to configure it with a dictionary
:param path: The path to the dictionary
:return: The logger
"""
logger = logging.getLogger(self.tag)
if dict_config:
with open(path) as fd:
conf = yaml.load(fd)
logging.config.dictConfig(conf['logging'])
else:
logging.basicConfig(level=self.base_level)
h = handler.FluentHandler(self.tag, host=self.host, port=self.port, buffer_overflow_handler=self.overflow_handler)
logger.addHandler(h)
return logger
# Private
def overflow_handler(self, pendings):
"""
Used to save data that overflowed the buffer
:param pendings: The pending data?
:return: Nothing
"""
unpacker = msgpack.Unpacker(BytesIO(pendings))
for unpacked in unpacker:
print(unpacked)
# Uncomment to have data that overflowed the buffer be posted directly to elasticsearch
# self.back_up_logger.log(unpacked[2])
if __name__ == '__main__':
local_host = '####'
el = ElasticLogger('test-logger', local_host, 24224, logging.NOTSET, False).build()
el.error('{"json":"message", "log":"work dammit"}')