log-pilot框架的排坑过程
在设计线上K8S环境的日志收集方案时,因为选择了阿里开源的log-pilot,因此也踩了不少坑,现在此做个记录。
在本方案采用的log-pilot版本中,使用的fluentd的版本为1.2.6
1、log-pilot不支持CKafka?
我们的需求是利用log-pilot的fluentd的模式将解析后的日志信息,汇聚同步到Kafka的Topic中。
生产环境的Kafka使用的是腾讯云的CKafka服务,其官方的描述如下:
消息队列 CKafka(Cloud Kafka)是基于开源 Apache Kafka 消息队列引擎,提供高吞吐性能、高可扩展性的消息队列服务。消息队列 CKafka 完美兼容 Apache kafka 0.9、0.10版本接口,在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。
可以理解为CKafka服务基于的是0.9版本的Kafka。
在使用的过程中,log-pilot一直报错。经过一番折腾后,发现是fluentd的1.2.6版本的依赖库有问题,其依赖关系如下:

最后经过测试发现,如下的依赖才能保证正常工作:
fluent-plugin-kafka: 0.6.6
ruby-kafka: 0.4.4
2、log-pilot中对fluentd版本的疑惑
2.1 问题一
在项目的Dockerfile.fluentd文件中,明确指出使用的fluentd的版本为v1.2.6。
当部署log-pilot后,进入容器,发现在/etc/fluentd/fluentd.conf文件中有如下的代码:
1 | <match docker.**> |
可以看到输出到kafka时,使用的类型为kafka_buffered,通过查看fluent-plugin-kafka项目,可以在README.md文件中看到如下一段话:

我的理解是如果你使用的是fluentdv1.x,那么就应该对应使用Kafka 2.0。
2.2 问题二
在log-pilot的config.fluentd的脚本中,发现可以有如下的代码:

在查看fluentd v0.12和v1系列的文档后,我发现有些配置在v1的文档中有,有些配置在v0.12系列的文档中有。
对此表示疑惑。我整理了一个表格进行对比:
| 环境变量 | 参数 | fluentdv1.x系列文档中是否支持 | fluentdv0.12系列文档中是否支持 |
|---|---|---|---|
| FLUENTD_BUFFER_TYPE | buffer_type | 不支持 | 支持 |
| FLUENTD_BUFFER_CHUNK_LIMIT | buffer_chunk_limit | 不支持 | 支持 |
| FLUENTD_BUFFER_QUEUE_LIMIT | buffer_queue_limit | 不支持 | 支持 |
| FLUENTD_BUFFER_CHUNK_LIMIT_SIZE | chunk_limit_size | 支持 | 不支持 |
| FLUENTD_BUFFER_TOTAL_LIMIT_SIZE | total_limit_size | 支持 | 不支持 |
| FLUENTD_BUFFER_CHUNK_FULL_THRESHOLD | chunk_full_threshold | 支持 | 不支持 |
| FLUENTD_BUFFER_COMPRESS | compress | 支持 | 不支持 |
| FLUENTD_FLUSH_INTERVAL | flush_interval | 支持 | 支持 |
| FLUENTD_FLUSH_MODE | flush_mode | 支持 | 不支持 |
| FLUENTD_FLUSH_THREAD_COUNT | flush_thread_count | 支持 | 不支持 |
| FLUENTD_FLUSH_AT_SHUTDOWN | flush_at_shutdown | 支持 | 支持 |
| FLUENTD_DISABLE_RETRY_LIMIT | disable_retry_limit | 不支持 | 支持 |
| FLUENTD_RETRY_LIMIT | retry_limit | 不支持 | 支持 |
| FLUENTD_RETRY_WAIT | retry_wait | 支持 | 支持 |
| FLUENTD_MAX_RETRY_WAIT | max_retry_wait | 不支持 | 支持 |
| FLUENTD_NUM_THREADS | num_threads | 不支持 | 不支持 |
2.3 问题三
fluentd的官方文档,有两份文档:

v1.0和v0.12系列的文档,但是通过查看fluentd项目的release版本,可以看到,版本号小于v1.0的,并不是0.12.x,而是0.14.x的版本。如下图:

但是,v0.12.43的发部时间在0.14.25的后面。
但是通过尝试,发现v0.14.25能够在不修改log-pilot的源码下正常工作,而v0.12.43则会报错,提示如下:

提示需要format配置,我可以在flunetd的配置文件加上format也可以正常工作,但是通过log-pilot生成的配置文件中没有提供这些选项,也就是说需要修改源码才能提供支持。
经过以上分析,决定使用fluentd1.7.4,并且使用fluent-plugin-kafka中的kafka2的模式。
3 如何利用日志信息中的时间戳
需要将日志中的时间提取出来作为elasticsearch中的@timestamp字段。通过查阅fluentd的v1文档,发现有如下两处需要配置:
- parse
在解析模块中,需要将日志中的时间按特定的格式解析出来 - inject
在输出到kafka时,通过inject命令注入解析出来的时间字段,解析出来的格式为: float,例如:1510544836.1547
其核心配置如下:
1 | <source> |
通过以上的配置,fluentd会在写往kafka的日志中,增加一个timestamp字段,此timestamp为一个时间戳,此时间戳为日志中解析出来的时间。然后在logstash中进行如下的配置,就可以将日志中的时间字段解析到elastcisearch中的@timestamp中:
1 | filter { |
4、如何实现日志字段的类型转换
fluentd在官方文档中的parse模块中,其明确指出了可以通过types指定字段类型,从而达到类型转换的目的。
经过测试fluentd v1.2.6会报如下错:
1 | error="unknown value conversion for key:'{"cost"', type:"'float'}" |
发现其解析有问题,将fluentd的版本升级到1.7.4问题解决。
5、调整汇总
综上所述,最后log-pilot的调整如下:
修改log-pilot的源码
- 使用fluentdv1.7.4
- 使用fluent-plugin-kafka: 0.6.6
- 使用ruby-kafka: 0.4.4
- 使用kafka2
- 修改log-pilot的format类,增加可以输入的参数: time_key、time_type、timezone、time_format、types
修改后的源码已上传到公司git仓库中。
log-pilot的最后传入的参数如下:
环境变量 参数 类别 描述 LOGGING_OUTPUT 输出类型 配置为kafka,输出到kafka FLUENTD_BUFFER_CHUNK_LIMIT_SIZE chunk_limit_size 缓存 配置为1M,用于配置每一个缓存块的大小,当缓存块满后,会将数据放入发送队列 FLUENTD_BUFFER_TOTAL_LIMIT_SIZE total_limit_size 缓存 配置为300M,配置当前log-pilot总的缓存大小 FLUENTD_BUFFER_CHUNK_FULL_THRESHOLD chunk_full_threshold 缓存 配置为0.95,当缓存块使用率达到多少时,将数据放入发送队列 FLUENTD_FLUSH_INTERVAL flush_interval 缓存 配置为3秒,每隔多少秒,刷新缓存块 FLUENTD_FLUSH_MODE flush_mode 缓存 配置为interval,即固定时间时间刷新缓存块 FLUENTD_FLUSH_THREAD_COUNT flush_thread_count 缓存 配置为1秒,即刷新线程为1个 FLUENTD_FLUSH_AT_SHUTDOWN flush_at_shutdown 缓存 配置为true,当程序关闭前,刷新一下缓存块 FLUENTD_RETRY_WAIT retry_wait 缓存 配置为1,首次重试的等待时间。重试为指数型重试 KAFKA_BROKERS kafka配置 线上或者测试环境的ckafka地址 KAFKA_OUTPUT_DATA_TYPE kafka配置 配置为json,用于配置当数据经过fluentd后,发往kafka的数据类型 KAFKA_MAX_SEND_RETRIES kafka配置 配置为3,当发送失败后,最大重试的次数 KAFKA_REQUIRED_ACKS kafka配置 配置为1,用于配置当收到多少个kafka的ACK后,确认消息发送成功 KAFKA_ACK_TIMEOUT kafka配置 配置为1,用于设置ACK的超时时间 KAFKA_KAFKA_AGG_MAX_BYTES kafka配置 配置为4096,当kafka的发送缓存的大小达到4096B(4KB)时,会刷新缓存,并将数据发送到kafka KAFKA_KAFKA_AGG_MAX_MESSAGES kafka配置 配置为100,当kafka中的缓存中存在的数据条数达到n条后,刷新缓存,并将数据发送到kafka KAFKA_DISCARD_KAFKA_DELIVERY_FAILED kafka配置 配置为false,当向kafka发送失败后,是否丢弃消息
接下是一幅描述fluentd的缓存机制的图:

