Zabbix 监控 Kafka Topic 积压数据是一个经典且关键的监控场景。积压量直接反映了消费者的处理能力与生产速度是否匹配,是衡量系统健康度的重要指标。以下是完整的解决方案,涵盖原理、步骤和最佳实践。
通过 Kafka 自带的 kafka-consumer-groups.sh 脚本(或 Kafka API)获取指定 Consumer Group 对特定 Topic 的 Lag(滞后量),即未消费的消息数。Zabbix 通过定期抓取此数值进行监控、告警和可视化。
这是最灵活、可控性最高的方案。
步骤 1:在 Kafka 节点或监控节点上创建监控脚本创建一个 Shell 或 Python 脚本,用于获取指定 Consumer Group 和 Topic 的 Lag。
示例 Shell 脚本 (/usr/local/bin/kafka_lag_check.sh):
#!/bin/bash
# 参数:Consumer Group, Topic Name
CONSUMER_GROUP=$1
TOPIC_NAME=$2
KAFKA_HOME=/opt/kafka
KAFKA_BROKERS="kafka-broker1:9092,kafka-broker2:9092"
# 使用 kafka-consumer-groups 命令获取 lag,并过滤出指定 Topic 的行
LAG=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group ${CONSUMER_GROUP} 2>/dev/null | grep "^${TOPIC_NAME}" | awk '{sum += $6} END {print sum}')
# 如果命令执行失败或没有匹配项,输出 -1 表示异常
if [ -z "$LAG" ]; then
echo -1
else
echo $LAG
fi
给脚本执行权限:
chmod +x /usr/local/bin/kafka_lag_check.sh
测试脚本:
./kafka_lag_check.sh your-consumer-group your-topic-name
# 输出应为数字,如 1200
更健壮的 Python 脚本示例(使用 kafka-python 库):
#!/usr/bin/env python3
import sys
from kafka import KafkaAdminClient
from kafka.admin import ConsumerGroupDescription
from kafka.errors import KafkaError
def get_topic_lag(brokers, group_id, topic):
try:
client = KafkaAdminClient(bootstrap_servers=brokers)
# 获取消费者组详细信息(此方法可能需要根据Kafka版本调整)
# 这里是一个简化的示例,实际生产环境建议使用 confluent_kafka 库的 list_consumer_group_offsets 和 list_offsets
# 或者直接调用 kafka-consumer-groups.sh 并解析输出
cmd = f"/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server {brokers} --describe --group {group_id} --topic {topic}"
import subprocess
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
for line in result.stdout.strip().split('\n'):
if line.startswith(topic):
parts = line.split()
if len(parts) >= 6:
return int(parts[5])
return -1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return -1
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: {} <bootstrap_servers> <consumer_group> <topic>".format(sys.argv[0]))
sys.exit(1)
brokers, group, topic = sys.argv[1], sys.argv[2], sys.argv[3]
lag = get_topic_lag(brokers, group, topic)
print(lag)
步骤 2:配置 Zabbix Agent 自定义监控项
在运行脚本的机器(可以是 Kafka Broker 或一个集中监控主机)上修改 Zabbix Agent 配置。
创建 UserParameter 配置文件(如 /etc/zabbix/zabbix_agentd.d/userparameter_kafka.conf):
# 语法:UserParameter=<key>[*],<command>
# $1: Consumer Group, $2: Topic Name
UserParameter=kafka.topic.lag[*],/usr/local/bin/kafka_lag_check.sh "$1" "$2"
[*] 表示接受参数,参数会传递给脚本。
重启 Zabbix Agent:
systemctl restart zabbix-agent
在 Zabbix Server 上测试是否能获取数据:
zabbix_get -s <agent_host_ip> -k "kafka.topic.lag[your-consumer-group,your-topic-name]"
# 应返回 lag 数值
步骤 3:在 Zabbix Web 界面配置
创建主机:如果还没为 Kafka 集群创建主机,建议创建一个,例如 “Kafka-Monitor”。
创建监控项(Item):Kafka Lag for [your-topic]:[your-consumer-group]kafka.topic.lag[your-consumer-group,your-topic-name]1m 或 5m(根据业务敏感度){Kafka-Monitor:kafka.topic.lag[your-consumer-group,your-topic-name].last()}>1000Topic {#TOPIC} 在 Consumer Group {#GROUP} 上的积压超过 1000 条消息。> 1000:警告> 10000:严重> 50000:灾难Kafka 本身通过 JMX 暴露了大量指标,包括 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=* 中的 records-lag-max 等。但直接使用 Zabbix 的 JMX 监控存在以下挑战:
适用场景:如果 Kafka 集群规模小,且已启用 JMX,可以作为补充方案监控 Broker 和 Producer 端指标。
简化步骤:
在 Kafka 启动脚本中开启 JMX 端口(如JMX_PORT=9999)。
在 Zabbix Server 上配置 Zabbix Java Gateway。
在 Zabbix 中创建一个 JMX 接口的主机,连接到 Kafka Broker。
通过 Zabbix 的 “JMX 发现” 功能自动发现 MBean 并创建监控项(可能需要自定义 MBean 模式)。
kafka-python 或 py-zabbix),通常基于方案一,但提供了开箱即用的模板和发现规则。
使用 Telegraf + Zabbix:exec 或 kafka_consumer 插件收集 Lag 数据。低层次自动发现(LLD):
{GROUP, TOPIC} 对的 JSON(使用 kafka-consumer-groups.sh --list 和 --describe 解析)。UserParameter 调用此脚本作为 发现规则。标签(Tags):
service:kafka, metric:lag, topic:xxx, consumer_group:yyy。便于在监控仪表盘中筛选和聚合。依赖项(Dependencies):
趋势预测与基线告警:
forecast)来识别 Lag 的非正常增长趋势,即使绝对值未达到阈值也发出预警。{host:kafka.topic.lag[group,topic].forecast(1h)} > {host:kafka.topic.lag[group,topic].avg(1w)} * 3告警消息优化:
{ITEM.LASTVALUE} 和 {ITEM.HOST} 等,使告警信息包含具体的 Lag 数值、Topic 和 Consumer Group 名称。| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 方案一:自定义脚本 | 灵活、准确、性能好、主流方案 | 需要自行编写和维护脚本 | 绝大多数生产环境,尤其是需要监控特定业务 Topic 的场景 |
| 方案二:JMX | 无需额外脚本,可同时获取Broker指标 | 配置复杂、性能开销大、Lag监控不便 | 小型集群,且已启用JMX进行Broker监控 |
| 方案三:社区模板 | 开箱即用,节省开发时间 | 可能不符合特定需求,需调试 | 想快速上手,且模板功能匹配 |
生产环境推荐组合:
按照上述步骤,您可以构建一个稳定、可扩展的 Kafka Topic 积压监控体系,为系统的稳定运行提供有力保障。