文章目录
  1. 1. 1、log-pilot不支持CKafka?
  2. 2. 2、log-pilot中对fluentd版本的疑惑
    1. 2.1. 2.1 问题一
    2. 2.2. 2.2 问题二
    3. 2.3. 2.3 问题三
  3. 3. 3 如何利用日志信息中的时间戳
  4. 4. 4、如何实现日志字段的类型转换
  5. 5. 5、调整汇总

在设计线上K8S环境的日志收集方案时,因为选择了阿里开源的log-pilot,因此也踩了不少坑,现在此做个记录。

在本方案采用的log-pilot版本中,使用的fluentd的版本为1.2.6

1、log-pilot不支持CKafka

我们的需求是利用log-pilot的fluentd的模式将解析后的日志信息,汇聚同步到Kafka的Topic中。

生产环境的Kafka使用的是腾讯云的CKafka服务,其官方的描述如下:

消息队列 CKafka(Cloud Kafka)是基于开源 Apache Kafka 消息队列引擎,提供高吞吐性能、高可扩展性的消息队列服务。消息队列 CKafka 完美兼容 Apache kafka 0.9、0.10版本接口,在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。

可以理解为CKafka服务基于的是0.9版本的Kafka。

在使用的过程中,log-pilot一直报错。经过一番折腾后,发现是fluentd的1.2.6版本的依赖库有问题,其依赖关系如下:

Log-pilot的默认依赖.png

最后经过测试发现,如下的依赖才能保证正常工作:

fluent-plugin-kafka: 0.6.6

ruby-kafka: 0.4.4

2、log-pilot中对fluentd版本的疑惑

2.1 问题一

在项目的Dockerfile.fluentd文件中,明确指出使用的fluentd的版本为v1.2.6。

当部署log-pilot后,进入容器,发现在/etc/fluentd/fluentd.conf文件中有如下的代码:

1
2
3
4
<match docker.**>
@type kafka_buffered
****
</match>

可以看到输出到kafka时,使用的类型为kafka_buffered,通过查看fluent-plugin-kafka项目,可以在README.md文件中看到如下一段话:

image

我的理解是如果你使用的是fluentdv1.x,那么就应该对应使用Kafka 2.0。

2.2 问题二

在log-pilot的config.fluentd的脚本中,发现可以有如下的代码:

image

在查看fluentd v0.12v1系列的文档后,我发现有些配置在v1的文档中有,有些配置在v0.12系列的文档中有。

对此表示疑惑。我整理了一个表格进行对比:

环境变量 参数 fluentdv1.x系列文档中是否支持 fluentdv0.12系列文档中是否支持
FLUENTD_BUFFER_TYPE buffer_type 不支持 支持
FLUENTD_BUFFER_CHUNK_LIMIT buffer_chunk_limit 不支持 支持
FLUENTD_BUFFER_QUEUE_LIMIT buffer_queue_limit 不支持 支持
FLUENTD_BUFFER_CHUNK_LIMIT_SIZE chunk_limit_size 支持 不支持
FLUENTD_BUFFER_TOTAL_LIMIT_SIZE total_limit_size 支持 不支持
FLUENTD_BUFFER_CHUNK_FULL_THRESHOLD chunk_full_threshold 支持 不支持
FLUENTD_BUFFER_COMPRESS compress 支持 不支持
FLUENTD_FLUSH_INTERVAL flush_interval 支持 支持
FLUENTD_FLUSH_MODE flush_mode 支持 不支持
FLUENTD_FLUSH_THREAD_COUNT flush_thread_count 支持 不支持
FLUENTD_FLUSH_AT_SHUTDOWN flush_at_shutdown 支持 支持
FLUENTD_DISABLE_RETRY_LIMIT disable_retry_limit 不支持 支持
FLUENTD_RETRY_LIMIT retry_limit 不支持 支持
FLUENTD_RETRY_WAIT retry_wait 支持 支持
FLUENTD_MAX_RETRY_WAIT max_retry_wait 不支持 支持
FLUENTD_NUM_THREADS num_threads 不支持 不支持

2.3 问题三

fluentd的官方文档,有两份文档:

fluentd版本.png

v1.0和v0.12系列的文档,但是通过查看fluentd项目的release版本,可以看到,版本号小于v1.0的,并不是0.12.x,而是0.14.x的版本。如下图:

fluentd-relase-0.14.x.png fluentd-release-0.12.x.png

但是,v0.12.43的发部时间在0.14.25的后面。

但是通过尝试,发现v0.14.25能够在不修改log-pilot的源码下正常工作,而v0.12.43则会报错,提示如下:

fluentd-0.12.x异常.png

提示需要format配置,我可以在flunetd的配置文件加上format也可以正常工作,但是通过log-pilot生成的配置文件中没有提供这些选项,也就是说需要修改源码才能提供支持。

经过以上分析,决定使用fluentd1.7.4,并且使用fluent-plugin-kafka中的kafka2的模式。

3 如何利用日志信息中的时间戳

需要将日志中的时间提取出来作为elasticsearch中的@timestamp字段。通过查阅fluentd的v1文档,发现有如下两处需要配置:

  • parse
    在解析模块中,需要将日志中的时间按特定的格式解析出来
  • inject
    在输出到kafka时,通过inject命令注入解析出来的时间字段,解析出来的格式为: float,例如: 1510544836.1547

其核心配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<source>
@type tail

...

<parse>
@type /^(?<timego>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}[+|-]\d{4}*): (?<jvm_time_offset>[^ ]+): \[(?<gc_type>.*) \((?<gc_reason>.*)\) (?<data>.*), (?<cost>.*) secs] \[Times: user=(?<cpu_user_cost_time>.*) sys=(?<cpu_sys_cost_time>.*), real=(?<cpu_real_cost_time>.*) (?<cost_time_unit>.*)]/
time_format %iso8601
time_key timego
time_type string
timezone +08:00
keep_time_key true
types cost:float,jvm_time_offset:float
</parse>
</source>

<match docker.**>
@type kafka2

...

<format>
@type json
</format>
<inject>
time_key timstamp
time_type float
</inject>
</match>

通过以上的配置,fluentd会在写往kafka的日志中,增加一个timestamp字段,此timestamp为一个时间戳,此时间戳为日志中解析出来的时间。然后在logstash中进行如下的配置,就可以将日志中的时间字段解析到elastcisearch中的@timestamp中:

1
2
3
4
5
filter {
date {
match => ["timestamp", "UNIX"]
}
}

4、如何实现日志字段的类型转换

fluentd在官方文档中的parse模块中,其明确指出了可以通过types指定字段类型,从而达到类型转换的目的。

经过测试fluentd v1.2.6会报如下错:

1
error="unknown value conversion for key:'{"cost"', type:"'float'}"

发现其解析有问题,将fluentd的版本升级到1.7.4问题解决。

5、调整汇总

综上所述,最后log-pilot的调整如下:

  1. 修改log-pilot的源码

    • 使用fluentdv1.7.4
    • 使用fluent-plugin-kafka: 0.6.6
    • 使用ruby-kafka: 0.4.4
    • 使用kafka2
    • 修改log-pilot的format类,增加可以输入的参数: time_key、time_type、timezone、time_format、types

    修改后的源码已上传到公司git仓库中。

  2. log-pilot的最后传入的参数如下:

    环境变量 参数 类别 描述
    LOGGING_OUTPUT 输出类型 配置为kafka,输出到kafka
    FLUENTD_BUFFER_CHUNK_LIMIT_SIZE chunk_limit_size 缓存 配置为1M,用于配置每一个缓存块的大小,当缓存块满后,会将数据放入发送队列
    FLUENTD_BUFFER_TOTAL_LIMIT_SIZE total_limit_size 缓存 配置为300M,配置当前log-pilot总的缓存大小
    FLUENTD_BUFFER_CHUNK_FULL_THRESHOLD chunk_full_threshold 缓存 配置为0.95,当缓存块使用率达到多少时,将数据放入发送队列
    FLUENTD_FLUSH_INTERVAL flush_interval 缓存 配置为3秒,每隔多少秒,刷新缓存块
    FLUENTD_FLUSH_MODE flush_mode 缓存 配置为interval,即固定时间时间刷新缓存块
    FLUENTD_FLUSH_THREAD_COUNT flush_thread_count 缓存 配置为1秒,即刷新线程为1个
    FLUENTD_FLUSH_AT_SHUTDOWN flush_at_shutdown 缓存 配置为true,当程序关闭前,刷新一下缓存块
    FLUENTD_RETRY_WAIT retry_wait 缓存 配置为1,首次重试的等待时间。重试为指数型重试
    KAFKA_BROKERS kafka配置 线上或者测试环境的ckafka地址
    KAFKA_OUTPUT_DATA_TYPE kafka配置 配置为json,用于配置当数据经过fluentd后,发往kafka的数据类型
    KAFKA_MAX_SEND_RETRIES kafka配置 配置为3,当发送失败后,最大重试的次数
    KAFKA_REQUIRED_ACKS kafka配置 配置为1,用于配置当收到多少个kafka的ACK后,确认消息发送成功
    KAFKA_ACK_TIMEOUT kafka配置 配置为1,用于设置ACK的超时时间
    KAFKA_KAFKA_AGG_MAX_BYTES kafka配置 配置为4096,当kafka的发送缓存的大小达到4096B(4KB)时,会刷新缓存,并将数据发送到kafka
    KAFKA_KAFKA_AGG_MAX_MESSAGES kafka配置 配置为100,当kafka中的缓存中存在的数据条数达到n条后,刷新缓存,并将数据发送到kafka
    KAFKA_DISCARD_KAFKA_DELIVERY_FAILED kafka配置 配置为false,当向kafka发送失败后,是否丢弃消息

接下是一幅描述fluentd的缓存机制的图:

fluentd-buffer.png

文章目录
  1. 1. 1、log-pilot不支持CKafka?
  2. 2. 2、log-pilot中对fluentd版本的疑惑
    1. 2.1. 2.1 问题一
    2. 2.2. 2.2 问题二
    3. 2.3. 2.3 问题三
  3. 3. 3 如何利用日志信息中的时间戳
  4. 4. 4、如何实现日志字段的类型转换
  5. 5. 5、调整汇总