一个小应用程序来监视kafka消费者的进度和它们的延迟的队列。
KafkaOffsetMonitor
是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。
你可以查看当前的消费者组,每个topic队列的所有partition的消费情况。可以很快地知道每个partition中的消息是否很快被消费以及相应的队列消息增长速度等信息。这些可以debug kafka的producer和consumer,你完全知道你的系统将会发生什么。
这个web管理平台保留的partition offset和consumer滞后的历史数据(具体数据保存多少天我们可以在启动的时候配置),所以你可以很轻易了解这几天consumer消费情况。
KafkaOffsetMonitor这款软件是用Scala代码编写的,消息等历史数据是保存在名为offsetapp.db数据库文件中,该数据库是SQLLite文件,非常的轻量级。虽然我们可以在启动KafkaOffsetMonitor程序的时候指定数据更新的频率和数据保存的时间,但是不建议更新很频繁,或者保存大量的数据,因为在KafkaOffsetMonitor图形展示的时候会出现图像展示过慢,或者是直接导致内存溢出了。
所有的关于消息的偏移量、kafka集群的数量等信息都是从Zookeeper中获取到的,日志大小是通过计算得到的。
消费者组列表
消费组的topic列表
图中参数含义解释如下:
topic:创建时topic名称
partition:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间
Last Seen:消费状态刷新最新时间。
topic的历史位置
Offset存储位置
kafka能灵活地管理offset,可以选择任意存储和格式来保存offset。KafkaOffsetMonitor目前支持以下流行的存储格式。
- kafka0.8版本以前,offset默认存储在zookeeper中(基于Zookeeper)
- kafka0.9版本以后,offset默认存储在内部的topic中(基于Kafka内部的topic)
- Storm Kafka Spout(默认情况下基于Zookeeper)
KafkaOffsetMonitor每个运行的实例只能支持单一类型的存储格式。
下载
可以到github下载KafkaOffsetMonitor
源码。
https://github.com/quantifind/KafkaOffsetMonitor
编译KafkaOffsetMonitor命令:
sbt/sbt assembly
不过不建议你自己去下载,因为编译的jar包里引入的都是外部的css和js,所以打开必须联网,都是国外的地址,你编译的时候还要修改js路径,我已经搞定了,你直接下载就好了。
百度云盘:https://pan.baidu.com/s/1kUZJrCV
启动
编译完之后,将会在KafkaOffsetMonitor根目录下生成一个类似KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar
的jar文件。这个文件包含了所有的依赖,我们可以直接启动它:
java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--zk zk-server1,zk-server2 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days
启动方式2,创建脚本,因为您可能不是一个kafka集群。用脚本可以启动多个。
vim mobile_start_en.sh
nohup java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb
--offsetStorage kafka
--zk 127.0.0.1:2181
--port 8080
--refresh 10.seconds
--retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &
各个参数的含义:
- offsetStorage:有效的选项是"zookeeper","kafka","storm"。0.9版本以后,offset存储的位置在kafka。
- zk: zookeeper的地址
- prot 端口号
- refresh 刷新频率,更新到DB。
- retain 保留DB的时间
- dbName 在哪里存储记录(默认'offsetapp')