Apusic文档中心
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
文档中心
  • 金蝶Apusic应用服务器

  • 金蝶Apusic负载均衡器

  • 金蝶Apusic分布式消息队列

    • 产品白皮书
    • 产品更新说明
    • V2.0.6

    • V2.0.6_for_kafka

      • 用户手册
      • 开发手册
    • V2.0.5

    • V2.0.4

    • V2.0.3

  • 金蝶Apusic分布式缓存

  • 金蝶Apusic分布式配置中心

  • 金蝶Apusic Java开发工具包软件

  • 金蝶Apusic全文检索

用户手册

# 版本更新说明

本文档最新版本包含历史修改记录如下:

更新日期 手册版本 适用产品 更新说明
2026年02月 V1.0 金蝶Apusic分布式消息队列V2.0.5 for kafka 初版发布
2026年03月 V1.1 金蝶Apusic分布式消息队列V2.0.6 for kafka 增加了2.0.6版本的管控台说明

# 产品简介

# 产品定位

金蝶天燕分布式消息中间件(ADMQ for kafka)是专为Kafka用户打造的信创替代型云原生消息中间件,完全遵循 Kafka 核心架构设计与特性规范,实现协议、客户端、生态工具、业务场景的全兼容,无需改造即可替换原有 Kafka 集群,满足信创项目落地需求。

# 核心价值

遵循 Kafka 分布式、高吞吐、低延迟的核心特性,同时适配信创软硬件环境,提供更稳定的运行保障、更便捷的运维支持及更全面的安全能力,成为 Kafka 信创替代的优选方案。

# 核心能力

核心能力 解释
Kafka 协议完全兼容 整实现 Kafka 生产消费协议、分区分配协议、Offset 管理协议、集群协调协议等核心协议,行为与原生 Kafka 一致。
全版本客户端适配 支持 Java、Go、Python、C++ 等 Kafka 官方客户端(2.x-3.x 版本)及第三方开源客户端,无代码改造即可接入。
生态工具无缝复用 兼容 Kafka-console-producer/consumer、kafka-topics.sh、kafka-configs.sh 等原生工具,支持 Kafka Connect、MirrorMaker 等数据同步工具。
信创环境深度适配 适配主流信创操作系统(麒麟、统信等)、信创芯片(ARM、RISC-V 等),兼容信创数据库及中间件生态。
Kafka 核心特性继承 支持主题分区、副本机制、消费者组、消息压缩、Offset 提交(自动 / 手动)等 Kafka 原生核心特性。

# 产品架构

1.架构概述
ADMQ for kafka产品采用 Kafka 经典分布式架构,完全遵循 Kafka 通信协议与与存储模型。产品整体架构分为管控台、核心引擎和分布式协调三大模块,具备高可靠、高可用、可扩展的特性。

一、管控台(Console)

  • 管控台是面向运维人员和管理员的可视化操作界面,提供全生命周期管理能力:
  • 集群管理:对消息集群节点进行接入和状态监控。
  • 用户权限:基于角色的访问控制(RBAC),保障操作安全。
  • 主题管理:创建、删除、配置消息主题(Topic),管理分区和副本。
  • 订阅管理:管理消费者组(Consumer Group)、订阅关系和消费位点。
  • 消息检索:支持按时间、Key、内容等条件查询历史消息。

二、核心引擎(Core Engine) 核心引擎是消息处理的核心,兼容 Kafka 协议,保障消息的可靠传输与存储:

Kafka 协议接入 & 安全访问控制

  • 提供标准 Kafka 协议兼容层,支持现有 Kafka 客户端无缝迁移。

  • 集成认证(如 SASL)、授权和传输加密(SSL/TLS),保障访问安全。

消息处理服务

  • 消息生产接入:接收生产者(Producer)发送的消息,进行校验和路由。

  • 消息消费服务:向消费者(Consumer)推送或拉取消息,支持顺序消费和批量消费。

  • 消息持久化存储:将消息写入磁盘,支持高吞吐写入和高效检索,保证数据不丢失。

  • 元数据服务:管理主题、分区、副本、消费者组等元数据信息,确保集群状态一致。

高可用与负载管理

  • 多副本同步:通过多副本机制(如 ISR)保证数据冗余,防止单点故障。

  • 故障恢复与副本迁移:节点故障时自动切换,支持副本在集群内迁移以平衡负载。

  • 负载均衡:将生产和消费请求均匀分配到集群节点,提升整体吞吐量。

  • 状态监控:实时监控节点、分区和副本的健康状态,为故障恢复提供依据。

三、分布式协调(Distributed Coordination)
分布式协调模块(通常基于 ZooKeeper 或自研协调器)负责维护集群的一致性和状态:

  • 集群成员管理:维护集群中所有 Broker 节点的在线 / 离线状态。

  • 元数据持久存储:存储并同步主题、分区、副本分配等核心元数据。

  • Controller 选举:在集群中选举出 Controller 节点,负责分区管理和故障处理。

  • 分区 Leader 选举:当分区 Leader 节点故障时,自动从 Follower 中选举新的 Leader。

  • 集群状态监听通知:当集群状态(如节点上下线、Leader 变更)发生变化时,向相关组件发送通知,触发相应处理。

# 产品优势

  • 零改造信创替代:无需修改业务代码、无需调整客户端配置(仅需变更连接地址),即可直接替换原有 Kafka 集群,迁移成本趋近于零。
  • 生态完全兼容:Kafka 原生工具、监控系统(Prometheus+Grafana)、数据同步工具等均可直接复用,无需额外适配开发。
  • 信创环境适配:全面兼容信创软硬件生态,满足政企信创项目落地要求,提供端到端的信创解决方案。
  • 特性原汁原味:完全遵循 Kafka 核心特性规范,支持用户熟悉的所有业务场景(如高吞吐数据传输、流式数据处理等),业务无感知切换。
  • 安全合规增强:在保留 Kafka 原有安全能力基础上,新增安全特性,符合信创项目安全合规标准。

# 应用场景

  • 信创项目替代场景:政企信创改造中,直接替换原有 Kafka 集群,无需重构业务系统,快速满足信创落地要求。
  • 高吞吐数据传输场景:如日志采集、业务数据同步等,继承 Kafka 高吞吐特性,支撑海量数据实时传输。
  • 流式数据处理场景:兼容 Flink、Spark Streaming 等流式计算引擎,无缝对接原有流式数据处理链路。
  • 分布式系统解耦场景:基于 Kafka 原生的发布订阅模式,实现分布式系统间异步通信,降低系统耦合度。

# 产品安装

# 安装概述

ADMQ for kafka包含核心引擎、分布式协调、管控台三个部分,其中核心引擎受license控制,需要在安装时申请并导入license。管控台为可选部件,如需要可视化管控时,可单独安装管控台。

# 单机部署

本文档指导用户在 Linux 环境下完成 ADMQ for Kafka 的单机模式部署,包含环境准备、安装配置、授权申请、服务启动及状态验证。

# 1. 环境准备与软件安装

# 1.1 创建部署目录

首先创建应用根目录并进入:

mkdir -p /apusic
cd /apusic
1
2

# 1.2 下载并解压软件包

可通过产品光盘拷贝产品介质或向项目的销售或实施人员获取。

# 拷贝到部署服务器

产品安装前需要将产品包拷贝到对应目录下。产品包名称可能随版本和发布日期而变化,这里以20260319发布的产品包为例。

# 解压安装包  
tar zxvf ADMQ-V2.0.6.391-Kafka-20260319.tar.gz
1
2
3
4
5
6

# 1.3 目录结构说明

解压完成后,当前目录下将生成 admq-kafka。标准目录结构如下:

目录名 说明
bin 存放启动、停止及管理脚本
config 存放配置文件 (如 kafka-standalone.conf)
data 存放 Kafka 消息数据及 ZooKeeper 数据
jdk 内置的 Java 运行环境,无需额外配置系统 JDK
logs 存放运行日志 (Broker 日志、ZK 日志等)
service 系统服务注册相关文件 (可选)

# 2. 配置修改

# 2.1 编辑监听地址

修改单机模式配置文件,绑定指定的 IP 地址和端口。

vi config/kafka-standalone.conf
1

修改内容:
找到 listeners 配置项,将其修改为服务器的实际 IP 地址(示例中使用 192.168.1.10):

listeners=PLAINTEXT://192.168.1.10:9092
1

# 3. 授权许可

ADMQ for kafka 产品需要 License 文件才能正常运行。

  1. 申请授权:联系管理员或通过授权平台申请对应版本的 license.xml 文件。
  2. 放置文件:将获取到的 license.xml 文件上传至部署目录根路径。
# 确认文件位置
ls -l /apusic/admq-kafka/license.xml
1
2

# 4. 启动服务

ADMQ for Kafka 单机模式依赖 ZooKeeper,需按顺序启动。所有操作请在安装根目录下执行。

# 4.1 启动 ZooKeeper

首先启动内置的 ZooKeeper 服务:

bin/admq-daemon start kafka zk
1

# 4.2 启动 Kafka Broker

待 ZooKeeper 启动成功后(通常等待 3-5 秒),启动 Kafka 主服务:

bin/admq-daemon start kafka standalone
1

# 5. 服务验证

# 5.1 检查进程状态

使用 ps 命令查看进程,正常状态下应能看到 两个 主要 Java 进程(一个 ZK,一个 Broker):

ps -ef | grep admq-kafka
1

预期输出示例:

# 5.2 检查端口监听

使用 netstat 确认 9092 端口是否处于 LISTEN 状态:

netstat -nltup | grep 9092
1

预期输出示例:

如果未看到输出,请检查防火墙设置或查看日志排查启动失败原因。

# 5.3 查看运行日志

如果服务启动异常或需监控运行状态,请查看 logs 目录下的日志文件:

# 查看实时日志
tail -f logs/server.log
tail -f logs/zookeeper.log

# 或者查看目录结构
ls -lh logs/
1
2
3
4
5
6

重点关注日志中是否有 ERROR 级别的信息。

# 6.停止服务

操作 命令
停止 ZooKeeper bin/admq-daemon stop kafka zk
停止 Kafka bin/admq-daemon stop kafka standalone

# 集群部署

# 1. 部署规划

假设部署 3 节点 高可用集群(ZooKeeper 与 Kafka Broker 混合部署),规划如下:

节点 IP 地址 ZK MyID Broker ID ZK 端口 Kafka 端口
Node 1 192.168.1.10 1 0 2181 9092
Node 2 192.168.1.11 2 1 2181 9092
Node 3 192.168.1.12 3 2 2181 9092

前置要求:

  1. 确保三台机器时间同步(NTP)。
  2. 关闭防火墙或开放端口:2181 (ZK), 2888 (ZK 选举), 3888 (ZK 选举), 9092 (Kafka)。

# 2. 所有节点执行:基础安装

在 Node 1, Node 2, Node 3 上分别执行:

# 2.1 创建目录并解压

mkdir -p /apusic
cd /apusic
tar zxvf ADMQ-V2.0.6.391-Kafka-20260212.tar.gz
cd admq-kafka
1
2
3
4

# 2.2 放置授权文件

将 license.xml 上传至 /apusic/admq-kafka/ 目录。

# 3. 配置 ZooKeeper 集群

ZooKeeper 集群需要修改配置文件并在数据目录创建 myid 文件。

# 3.1 修改 ZK 配置文件

编辑 config/kafka-zk.conf。

在所有三个节点上,配置内容完全一致:

### --- 集群配置 --- ###
# 注意:这里的 1, 2, 3 对应后面要创建的 myid 文件内容
#server.1=192.168.1.10:2888:3888:participant;2181
#server.2=192.168.1.11:2888:3888:participant;2181
#server.3=192.168.1.12:2888:3888:participant;2181
1
2
3
4
5

# 3.2 创建 MyID 文件 (每台机器不同)

在 Node 1 (192.168.1.10) 上执行:

# 创建数据和日志目录
mkdir -p /apusic/admq-kafka/data/zk/data
mkdir -p /apusic/admq-kafka/data/zk/logs
# 写入 MyID (必须与配置文件中 server.1 的 1 对应)
echo "1" > /apusic/admq-kafka/data/zk/data/myid
1
2
3
4
5

在 Node 2 (192.168.1.11) 上执行:

mkdir -p /apusic/admq-kafka/data/zk/data
mkdir -p /apusic/admq-kafka/data/zk/logs
echo "2" > /apusic/admq-kafka/data/zk/data/myid
1
2
3

在 Node 3 (192.168.1.12) 上执行:

mkdir -p /apusic/admq-kafka/data/zk/data
mkdir -p /apusic/admq-kafka/data/zk/logs
echo "3" > /apusic/admq-kafka/data/zk/data/myid
1
2
3

# 4. 配置 Kafka Broker 集群

编辑 config/kafka-broker.conf。

# 4.1 Node 1 配置 (192.168.1.10)

# 唯一 Broker ID (集群内不可重复)
broker.id=0

# 监听地址 (绑定本机 IP)
listeners=PLAINTEXT://192.168.1.10:9092

# 连接 ZK 集群地址 (所有节点配置必须一致)
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
zookeeper.connection.timeout.ms=18000
1
2
3
4
5
6
7
8
9

# 4.2 Node 2 配置 (192.168.1.11)

broker.id=1
listeners=PLAINTEXT://192.168.1.11:9092
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
1
2
3

# 4.3 Node 3 配置 (192.168.1.12)

broker.id=2
listeners=PLAINTEXT://192.168.1.12:9092
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
1
2
3

# 5. 启动集群

严格遵循启动顺序:先 ZK 集群,后 Kafka 集群。

# 5.1 第一步:启动所有节点的 ZooKeeper

依次登录 Node 1, 2, 3 执行:

cd /apusic/admq-kafka
bin/admq-daemon start kafka zk
1
2

# 5.2 第二步:启动所有节点的 Kafka Broker

依次登录 Node 1, 2, 3 执行:

cd /apusic/admq-kafka
bin/admq-daemon start kafka broker
1
2

# 6. 验证与测试

# 6.1 验证 ZooKeeper 集群状态

在任意节点执行四字母命令 ruok :

# 检查 ZK 角色 (Leader/Follower)
echo ruok | nc 192.168.1.10 2181
echo ruok | nc 192.168.1.11 2181
echo ruok | nc 192.168.1.12 2181
1
2
3
4

预期执行结果,返回imok。

echo ruok | nc 192.168.1.10 2181
imok

echo ruok | nc 192.168.1.11 2181
imok

echo ruok | nc 192.168.1.12 2181
imok
1
2
3
4
5
6
7
8

# 6.2 验证 Kafka 集群元数据

使用命令行工具查看集群 Broker 列表:

kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.1.10:9092
1

# 6.3 功能测试 (创建 Topic 并生产消费)

  1. 创建 Topic (3 副本,确保数据分布在所有节点):
kafka/bin/kafka-topics.sh --create --topic test-cluster --bootstrap-server 192.168.1.10:9092 --partitions 3 --replication-factor 3
1
  1. 查看 Topic 详情 (确认 ISR 和 Leader 分布):
kafka/bin/kafka-topics.sh --describe --topic test-cluster --bootstrap-server 192.168.1.10:9092
1

预期输出示例:

Topic: test-cluster  PartitionCount: 3  ReplicationFactor: 3
  Partition: 0  Leader: 0  Replicas: 0,1,2  Isr: 0,1,2
  Partition: 1  Leader: 1  Replicas: 1,2,0  Isr: 1,2,0
  Partition: 2  Leader: 2  Replicas: 2,0,1  Isr: 2,0,1
1
2
3
4

(Leader 均匀分布在不同节点,ISR 包含所有节点,说明集群健康)

  1. 生产与消费测试:
# 生产者
kafka/bin/kafka-console-producer.sh --topic test-cluster --bootstrap-server 192.168.1.10:9092

# 消费者 (新开终端)
kafka/bin/kafka-console-consumer.sh --topic test-cluster --from-beginning --bootstrap-server 192.168.1.10:9092
1
2
3
4
5

# 7. 停止服务

# 先停 Kafka
bin/admq-daemon stop kafka broker
# 后停 ZK
bin/admq-daemon stop kafka zk
1
2
3
4

# 管理控制台部署

# 安装前准备

  • 获取安装包:从 http://www.apusic.com/下载金蝶 Apusic分布式消息队列安装包,或从金蝶Apusic 分布式消息队列产品光盘中获得相应的安装包文件。
  • 系统要求:Java环境(管控台JDK17以上,节点JDK1.8及以上版本)、内存(16GB+)、硬盘空间(100GB+)、浏览器(IE8及以上,FireFox,Chrome)

# 开始安装

# 获取上传产品包

ADMQ for kafka管控台安装包为admq-manager开头的压缩包,例如admq-manager-V3.0.1.tar.gz

获取安装包后,可将其上传到服务器的任意目录下。

# 管理控制台安装

解压安装包

mv admq-manager-V3.0.1.tar.gz /opt/
cd /opt
tar -zxvf admq-manager-V3.0.1.tar.gz
1
2
3

修改数据库类型(可选)

默认使用H2数据库,如果正式环境部署需要换成其他数据库,例如MySQL。

/opt/admq-manager-V3.0.1/config/application.properties 文件中添加了多种数据库配置,可以把使用的数据库配置打开,注释掉其他不使用的数据库配置项。

  1. 注释掉H2数据库相关配置,打开mysql的相关配置

    image-20241023101409999

  2. 配置MYSQL的地址、端口、数据库名称、用户名、密码,路径如上图 config/other-config/db-mysql.properties image-20230109135743029

  3. 管控台启动的时候会自动加载数据库初始化脚本

其他数据库参照此步骤即可

# 启动管理控制台服务

cd /opt/admq-manager-V3.0.1/
bin/admq-manager start
1
2

# 停止管理控制台服务

cd /opt/admq-manager-V3.0.1/
bin/admq-manager stop
1
2

# 验证

1.在浏览器中键入URL:http://ip:12305 或者 https://ip:12306

2.在登录界面输入用户名(admq),密码(11111111)进行登录,首次登录需要修改密码。

image-20230109135743029

3.若登录成功,会跳转到ADMQ管控台首页,则表示安装成功。

image-20241108141002313

# 管控台使用

# 概述

ADMQ fro kafka管控台分为系统管理与运维管理,系统管理主要用于核心引擎的接入,接入后可登录运维管理侧查看引擎状态,并对引擎进行管理维护。

用户需先进入系统管理侧进行集群配置与对接,然后在运维管理侧进行集群管理。

# 系统管理

# 导入licnese

使用管理员登录管控台后,点击菜单会出现“License 已过期,请重新申请新的 License 或者减少节点数”提示。需在系统配置的license管理功能中导入产品的任意license文件。

  • 导入前

  • 导入后

# 上传产品包

为保障运行一致性,需在软件包管理功能中,上传对应产品的产品包。若未上传产品包,在后续接入时,将无法选择到对应的版本进行接入。

  • 上传前

  • 上传后

# 对接集群

如产品引擎已经安装完毕,并确认管控台所在服务器可以访问产品引擎所在的服务器和端口。则可通过集群列表-》管控接入功能对产品引擎进行可视化管理。

  • 填写集群信息
    • 填写集群相关信息,如集群类型、部署模式以及版本

注意:若上传软件包步骤未执行,则无法选择到版本。

  • 添加集群配置

    • 添加待连接集群的访问地址和端口。可添加一个集群的多个节点,以便在部分节点失效时,依旧可以保持访问。多个地址之间以逗号分隔。
  • 查看集群列表与修改配置

    • 添加完成后,回到集群列表,可看到添加的集群信息。并可通过配置功能修改集群接入的配置信息。

# 开通运维用户

  • 添加运维用户

    进入用户管理,新增用户,这里以kafauser为例

  • 分配用户角色

点击‘分配角色’按钮,打开分配角色界面,为用户分配角色。

# 运维管理

# 使用运维用户登录

以kafauser为例,输入用户名密码进行登录。用户首次登录时,需要修改密码。用户在修改密码后重新登录,即可进入系统。

# 概览与切换

运维用户登录后,可同时管理多套系统,通过左上角下拉框进行切换。用户可查看当前实例的连接情况和基本信息。

# 主题管理

用户可使用管控台进行主题的创建、管理、维护与删除。主题具有缓存功能当服务器离线或者服务器上的主题被删除时,依旧可以通过本地缓存获取主题配置信息,此时字段服务端状态为不可用。对主题的操作也会失败。

# 主题创建

用户进入主题列表,点击新增按钮,进行主题新增.可根据需要设置分区数量、分区副本数、消息保留时间等参数设置。用户可根据需要进行自定义参数的配置,可参考右上角的帮助,查看可选参数。

# 主题维护

主题创建后可在主题列表中看到,用户可使用修改功能,对主题配置进行修改。主题创建后主题名称与分区副本数不允许修改。

# 主题详情

点击列表中的主题名称,可进入主题详情。主题包含分区、消息、订阅、设置四个部分的功能。用户也可以在主题详情中,进行消息发送和主题清空。

发送消息

用户点击发送新消息后,可填写需发送的消息内容,并点击发送。发送时,需指定发送分区。

分区:

通过分区功能可以查看主题下的所有分区,以及分区的副本数、偏移量信息,也可以清空特定的分区。

消息:
通过消息功能可检索该主题下的消息,支持两种查询方式:

1.按偏移量点位查询:通过偏移量点位查询,可指定起始偏移量。

2.按时间查询:通过时间查询,可指定开始时间。

订阅:

用户通过订阅功能,可查看该主题下的所有订阅关系,并查看订阅关系详情。

设置:

用户通过设置功能,可查看该主题的配置信息,并修改该主题的配置信息。

# 订阅组管理

用户可使用管控台进行订阅组的创建、管理、维护与删除。订阅组具有缓存功能,当服务器离线或者服务器上的订阅组被删除时,依旧可以通过本地缓存获取订阅组列表。

注意:为了避免影响数据消费,本地订阅组创建时,不会在服务器上实际创建订阅组。但是当本地订阅组被删除时,服务器上的订阅组也会被同步删除。

# 订阅组与主题

点击订阅组进入订阅组详情,可看到该订阅组的订阅关系,每个订阅主题会有单独的记录,并可查看对应主题的消费延迟情况。

重置偏移量
当某个主题消息存在问题,需要重新消费时,可通过重置偏移量功能进行偏移量重置。

重置包含四种方式:

1.最早位置:直接将偏移量设置为可达到的最早偏移量,实现现存所有消息的重消费。

2.最新位置:将偏移量设置为最新位置,实现略过所有未消息消息,从最新消息开始消费。

3.按时间戳:将偏移量设置为指定时间,从该时间后重新消费消息。

4.按具体偏移量:通过为每个分区或所有分区设置偏移量,实现对分区消费点的全面调整和控制。

# 主题消费者

通过展开订阅者详情,可查看订阅组下所有消费者信息。

# 消息查询

用户可通过消息查询对当前实例下的所有主题进行检索,支持两种查询方式:

1.按偏移量点位查询:通过偏移量点位查询,可指定起始偏移量。

2.按时间查询:通过时间查询,可指定开始时间。

为了避免对引擎产生较大的查询压力,故一次返回的数据量有限制,默认为100条。完成查询后,对应的查询结果会显示在列表中,可点击查看消息详情,查看消息的详细内容。

# 配置与维护

# 核心引擎目录说明

目录名 说明
bin 存放启动、停止及管理脚本
config 存放配置文件 (如 kafka-standalone.conf)
data 存放 Kafka 消息数据及 ZooKeeper 数据
jdk 内置的 Java 运行环境,无需额外配置系统 JDK
logs 存放运行日志 (Broker 日志、ZK 日志等)
service 系统服务注册相关文件 (可选)

# Broker 配置说明

# 基础配置 (Basic Configuration)

核心作用:定义 Broker 的身份、网络暴露方式及存储路径。

参数 说明 生产环境建议
broker.id 集群唯一标识。Kafka 通过此 ID 在 ZooKeeper 中注册自己。 必须唯一。若重启后 ID 改变,Broker 会被视为新节点,导致旧数据不可见或集群元数据混乱。通常写死在配置文件中,不要动态生成。
listeners 监听地址。定义 Broker 绑定哪些 IP 和端口,以及使用什么协议。格式:协议://IP:端口
。
- 内网部署:PLAINTEXT://0.0.0.0:9092
(绑定所有网卡) 或指定内网 IP。 - 多网卡/混合云:可配置多个,如 INTERNAL://192.168.1.10:9092,EXTERNAL://10.0.0.5:9093
。
advertised.listeners 广播地址。客户端连接时,Broker 返回给客户端的地址。至关重要。 - 如果客户端和 Broker 不在同一网络(如 Docker、云服务器),必须配置此项为客户端可访问的公网 IP 或域名。 - 若不配,默认取 listeners
的值,可能导致客户端连接内网 IP 失败。
log.dirs 数据存储目录。消息日志文件存放路径。 - 强烈建议使用高性能 SSD。 - 可配置多个目录(逗号分隔)以利用多块磁盘 IO,如 /data1/kafka,/data2/kafka
。
num.partitions 默认分区数。创建 Topic 时未指定分区数时的默认值。 建议设置为 3~6(与集群 Broker 数量匹配或略多),以提高并行消费能力。
auto.create.topics.enable 自动创建 Topic。当生产者向不存在的 Topic 发消息时,是否自动创建。 **生产环境强烈建议设为 ****false**
。 原因:防止因代码拼写错误创建大量无用 Topic,且自动创建的 Topic 参数(分区/副本)往往不符合生产标准。

# 副本与可靠性 (Replication & Reliability)

核心作用:保证数据不丢失、服务高可用。

参数 说明 生产环境建议
default.replication.factor 默认副本数。新建 Topic 时的默认副本数量。 建议 ≥ 3。允许同时挂掉 2 台机器而不丢失数据。需确保集群 Broker 数量 ≥ 该值。
min.insync.replicas 最小同步副本数。配合 acks=all
使用,表示写入成功至少需要多少个副本确认。
- 若设为 1:只要 Leader 写入成功即返回,Leader 挂掉可能丢数据。 - 推荐设为 2 (配合 3 副本):保证即使挂掉 1 台,仍有 1 个完整副本存活,且写入时需 2 个节点成功,兼顾安全与可用。
offsets.topic.replication.factor __consumer_offsets 内部 Topic 的副本数。 必须 ≥ 2 (最好 3)。若该 Topic 挂了,所有消费者无法提交位移,导致重复消费或消费停滞。
replica.lag.time.max.ms ISR 踢出阈值。Follower 落后 Leader 超过此时间,将被移出 ISR 列表。 默认 30s。网络抖动大时可适当调大,但过大会导致故障切换变慢。
unclean.leader.election.enable 非 ISR 选举。允许不在 ISR 中的副本(数据落后的)成为 Leader。 必须设为 **false**
(默认)。 若设为 true
:当所有 ISR 挂掉,强制让数据落后的副本上位,会导致
数据丢失
。生产环境严禁开启。
controller.quorum.voters 控制器投票节点。仅用于 KRaft 模式 (无 ZK)。 ZK 模式下留空。若迁移到 KRaft 模式,需配置 Controller 节点的 ID 和地址。

# 日志保留策略 (Log Retention)

核心作用:控制磁盘空间使用,清理旧数据。

参数 说明 生产环境建议
log.retention.hours 按时间保留。日志文件保留的小时数。 默认 168h (7天)。根据业务需求调整,如合规要求保留 30 天则设为 720。
log.retention.bytes 按大小保留。每个 Partition 最大保留的字节数。 注释该配置则默认为 -1 (不限制)。建议设置上限(如 50GB),防止单 Topic 写爆磁盘。
log.segment.bytes 段文件大小。日志切分的大小阈值。 默认 1GB。太大导致清理粒度粗,太小导致文件句柄过多。一般保持默认即可。
log.retention.check.interval.ms 检查间隔。后台线程检查是否要删除旧日志的频率。 默认 5 分钟。无需频繁修改。

# 性能调优 (Performance Tuning)

核心作用:平衡吞吐量与延迟,适配硬件资源。

参数 说明 生产环境建议
num.network.threads 网络线程数。处理 Socket 读写、编解码的线程数。 默认 8。高并发场景(如万级 TPS)可调至更大或与 CPU 核数匹配。
num.io.threads IO 线程数。处理磁盘读写请求的线程数。 默认 16。建议设置为 磁盘数量的 2 倍 或 CPU 核数,如 16。
socket.send/receive.buffer.bytes Socket 缓冲区。发送/接收缓冲区大小。 默认 1024KB。高吞吐场景可适当调大(如 2MB),依赖 OS 内核参数配合。
log.flush.interval.messages/ms 刷盘策略。多少条消息或多少毫秒强制刷盘到磁盘。 默认值极大 (即不主动强制刷盘)。 最佳实践:依赖 OS 的 Page Cache 异步刷盘,性能最高。仅在极度追求数据零丢失且能接受性能大幅下降时才调小此值。

# ZooKeeper 连接 (ZooKeeper Connection)

核心作用:连接元数据管理中心(仅限 ZK 模式)。

参数 说明 生产环境建议
zookeeper.connect ZK 地址列表。格式 host:port
,多节点用逗号分隔。
填写所有 ZK 节点地址。支持 chroot 路径。
zookeeper.session.timeout.ms 会话超时。Broker 与 ZK 心跳丢失多久判定为下线。 默认 18s-30s。调小可加快故障发现,但网络抖动易导致误判重平衡;调大则故障恢复慢。
zookeeper.chroot.path 根路径。将 Kafka 元数据存储在 ZK 的某个子路径下。 多集群共用 ZK 时必配。例如 /kafka-cluster-01
,避免不同 Kafka 集群元数据冲突。

# 安全 (Security)

核心作用:认证与加密(注释部分为示例)。

参数 说明 生产环境建议
security.inter.broker.protocol Broker 间通信协议。 生产环境建议启用 SASL_SSL
或至少 SASL_PLAINTEXT
,防止内部窃听或伪造节点。
sasl.enabled.mechanisms 认证机制。 常用 PLAIN
(简单账号密码), SCRAM-SHA-256/512
(更安全), GSSAPI
(Kerberos)。
listener.name...jaas.config JAAS 配置。 定义用户名密码。建议将敏感信息移至独立的 JAAS 文件,而非直接写在 properties 中。

# 其他 (Others)

参数 说明 生产环境建议
delete.topic.enable 删除 Topic 开关。 **建议 ****true**
。方便运维清理测试数据或废弃业务线数据。若为 false
,删除命令仅标记无效,不释放磁盘。
JMX_PORT 监控端口。 必须配置。用于 Prometheus/JConsole 采集指标。需在启动脚本中通过 -D
参数设置,并注意防火墙放行。
replica.fetch.wait.max.ms Follower 拉取等待时间。 默认 500ms。适当调大可减少 Follower 对 Leader 的空轮询压力,但会增加同步延迟。一般保持默认。

# ZK 配置说明

# 基础配置 (Basic Timing)

核心作用:定义心跳节奏和集群容错时间窗口,直接影响故障检测速度。

参数 说明 生产环境建议
tickTime 心跳基准时间 (ms)。ZK 内部所有时间计算的基础单位。 默认 **2000ms **(2秒)。 - 设太小:网络抖动易导致误判节点下线。 - 设太大:故障检测慢。 建议保持 2000,除非网络极差可调至 3000-4000。
initLimit 初始化同步时限。Follower 启动时从 Leader 同步数据的最大 tickTime 倍数。 默认 10 (即 20秒)。 若数据量大或磁盘慢,需调大 (如 20),否则 Follower 启动会超时失败。
syncLimit 心跳同步时限。Follower 与 Leader 心跳响应的最大 tickTime 倍数。 默认 5 (即 10秒)。 超过此时间未响应,Leader 认为该 Follower 已死,将其踢出集群。网络不稳定时可适当调大至 8-10。

# 数据存储 (Data Storage)

核心作用:控制数据落盘策略,直接影响 IO 性能和磁盘寿命。

参数 说明 生产环境建议
dataDir 快照数据目录。存储内存数据库的快照 (snapshot) 和 myid
文件。
必须配置在高性能 SSD 上。不要与事务日志 (dataLogDir
) 混用,避免 IO 争抢。
dataLogDir 事务日志目录。存储写入操作的事务日志 (transaction log)。 关键性能点: 1. 务必独立挂载磁盘 (与 dataDir
物理隔离)。 2. ZK 对顺序写要求高,独立的 SSD 能极大提升吞吐量。
preAllocSize 预分配文件大小 (KB)。事务日志文件预分配的大小。 默认 65536 (64MB)。 配置中设为 131072 (128MB)。 作用:减少文件系统碎片和频繁的文件扩展系统调用。大文件有利于顺序写性能。
snapCount 快照触发频率。每处理多少事务生成一次快照。 默认 100,000。 配置中设为 200,000。 作用:减少快照生成频率,降低磁盘 IO 压力。但会导致重启恢复时间变长(因为要重放更多日志)。生产环境可适当调大。
fsync.warningthresholdms 刷盘警告阈值 (ms)。当 fsync
耗时超过此值时记录警告日志。
用于监控磁盘健康状况。若频繁出现警告,说明磁盘 IO 瓶颈严重,需更换硬件或优化负载。

# 连接信息 (Connection Info)

核心作用:定义客户端接入点和并发控制。

参数 说明 生产环境建议
clientPort 客户端监听端口。Kafka Broker 和 业务客户端 连接 ZK 的端口。 默认 2181。防火墙需开放此端口。
maxClientCnxns 单 IP 最大连接数。限制单个 IP 地址能建立的最大连接数。 默认 60。 配置中设为 0 (表示无限制)。 风险:若某台机器异常发起大量连接,可能耗尽 ZK 资源。生产环境建议设置合理上限 (如 200-500),防止 DDoS 或代码死循环。

# 集群配置 (Cluster Config)

核心作用:定义集群拓扑和节点角色。

参数 说明 生产环境建议
server.x=A:B:C:D;E 集群节点定义。 x
: MyID (对应 myid
文件)。 A
: IP。 B
: 2888 (Follower 与 Leader 通信)。 C
: 3888 (选举通信)。 D
: 角色 (participant/observer)。 E
: 客户端端口 (可选,新版支持)。
- 奇数节点:集群节点数必须为奇数 (3, 5, 7),以容忍 (N-1)/2
台故障。 - Observer:若需增加读性能而不增加投票负担,可将部分节点设为 observer
。
electionPortBindRetry 选举端口绑定重试次数。 默认 3。若端口被占用,重试几次后退出。一般无需修改。

# 自动清理 (Auto Purge)

核心作用:防止磁盘被历史快照和日志填满。

参数 说明 生产环境建议
autopurge.snapRetainCount 保留快照数量。保留最近的 N 个快照文件。 配置中设为 3。 建议:至少保留 3 个,以防最新快照损坏时有旧版本可回滚。
autopurge.purgeInterval 清理间隔 (小时)。每隔多久执行一次清理任务。 配置中设为 6 (小时)。 建议:根据数据量调整。高写入场景可设为 1-2 小时,低负载可设为 12-24 小时。

# TCP 网络优化 (Network Tuning)

核心作用:应对高并发连接和会话管理。

参数 说明 生产环境建议
globalOutstandingLimit 全局待处理请求上限。防止请求堆积导致 OOM。 默认 1000。 配置中设为 5000。 注意:调大可提高吞吐,但若后端处理慢,可能导致内存暴涨。需配合监控观察内存使用率。
minSessionTimeout 最小会话超时 (ms)。客户端请求的 timeout 低于此值将被强制设为此值。 默认 2 * tickTime。 设太小会导致网络微抖动就断连。建议 ≥ 4000ms。
maxSessionTimeout 最大会话超时 (ms)。 默认 20 * tickTime。 配置中设为 60000 (60秒)。 允许长连接客户端设置更长的超时时间,减少重连频率。

# 监控和管理 (Admin & 4lw)

核心作用:运维监控与安全控制。

参数 说明 生产环境建议
admin.enableServer 是否开启 AdminServer (HTTP 接口)。 默认 true (新版)。 配置中设为 false
。 安全建议:若不需要 HTTP 监控接口,建议关闭以减少攻击面。可通过 JMX 或 4lw 命令监控。
4lw.commands.whitelist 四字命令白名单。ZK 通过 echo cmd | nc ip 2181 进行监控。

# 其他配置 (Others)

参数 说明 生产环境建议
skipACL 跳过 ACL 检查。 默认 no。 若设为 yes
,将忽略所有权限检查,大幅提升性能,但失去安全性。仅在完全可信的内网且追求极致性能时使用。
leaderServers Leader 是否处理读请求。 配置中 no
(可能是旧版本参数或特定发行版)。 标准 ZK 中 Leader 默认处理所有写请求和部分读请求。若设为不参与服务,可能影响读吞吐。通常保持默认。
forceSync 是否强制每次事务都 fsync。 默认 no
(使用 OS 缓冲)。 若设为 yes
,数据最安全但性能急剧下降。**生产环境务必保持 ****no**
。
sslQuorum
/ portUnification
SSL 加密与端口统一。 若需启用 SSL 加密集群通信,需设为 true
并配置证书。明文内网通信可保持 false
。

# 常见命令

# Topic 管理

# 创建 Topic

创建一个名为 test-topic 的 Topic,包含 3 个分区,3 个副本。

kafka/bin/kafka-topics.sh --create \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic \
  --partitions 3 \
  --replication-factor 3
1
2
3
4
5
  • 注意:replication-factor 不能超过集群中 Broker 的数量。

# 查看 Topic 列表

# 列出所有 Topic
kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.1.10:9092

# 过滤特定 Topic (支持正则)
kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.1.10:9092 | grep "test"
1
2
3
4
5

# 查看 Topic 详情 (分区、副本分布)

这是排查数据倾斜和副本状态最重要的命令。

kafka/bin/kafka-topics.sh --describe \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic
1
2
3

输出解读:

  • Leader: 当前负责读写的副本 ID。
  • Replicas: 该分区所有副本所在的 Broker ID 列表。
  • Isr (In-Sync Replicas): 当前与 Leader 保持同步的副本列表。如果 Isr 数量少于 Replicas,说明有副本落后或宕机。

# 增加分区数 (只能增加,不能减少)

将 test-topic 的分区数从 3 增加到 6。

kafka/bin/kafka-topics.sh --alter \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic \
  --partitions 6
1
2
3
4

# 删除 Topic

kafka/bin/kafka-topics.sh --delete \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic
1
2
3
  • 前提:配置文件 server.properties 中 delete.topic.enable=true。

# 生产与消费 (数据测试)

# 启动控制台生产者 (Producer)

向 test-topic 发送消息。输入一行按回车即发送一条消息。

kafka/bin/kafka-console-producer.sh \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic
1
2
3
  • 高级用法:指定 Key 发送(用于测试分区策略)
kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.10:9092 --topic test-topic --property "parse.key=true" --property "key.separator=:"
# 输入格式:key:value  (例如 user1:login)
1
2

# 启动控制台消费者 (Consumer)

从 test-topic 消费消息。

  • 消费最新数据:
kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic
1
2
3
  • 从头开始消费 (历史数据):
kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic \
  --from-beginning
1
2
3
4
  • 显示 Key 和 分区信息:
kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.10:9092 \
  --topic test-topic \
  --from-beginning \
  --property print.key=true \
  --property partition=true \
  --property print.timestamp=true
1
2
3
4
5
6
7
  • 限制消费数量 (测试用):
kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.10:9092 --topic test-topic --max-messages 10
1

# 消费者组管理 (Consumer Groups)

# 查看消费者组列表

kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server 192.168.1.10:9092 \
  --list
1
2
3

# 查看组内详情 (延迟、Offset)

运维最常用命令,用于查看消费积压 (Lag)。

kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server 192.168.1.10:9092 \
  --group my-consumer-group \
  --describe
1
2
3
4

输出关键列:

  • CURRENT-OFFSET: 当前已提交的位置。
  • LOG-END-OFFSET: 分区最新的消息位置。
  • LAG: 积压量 = LOG-END-OFFSET - CURRENT-OFFSET。如果 LAG 持续增大,说明消费速度跟不上生产速度。

# 重置 Offset (慎用)

将消费者组的 Offset 重置到最早、最新或指定时间。

  • 重置到最早:
kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.10:9092 \
  --group my-consumer-group \
  --topic test-topic \
  --reset-offsets --to-earliest \
  --execute
1
2
3
4
5
  • 重置到最新 (相当于忽略历史积压):
kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.10:9092 \
  --group my-consumer-group \
  --topic test-topic \
  --reset-offsets --to-latest \
  --execute
1
2
3
4
5
  • 注意:执行重置时,消费者组必须处于非活跃状态(即没有正在运行的消费者实例)。

# 集群与元数据管理

# 查看集群 Broker 信息

kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.1.10:9092
1

# 首选副本选举 (Preferred Leader Election)

如果因为故障导致 Leader 分布不均匀(例如所有 Leader 都在 Node 1),可以触发重新选举,让 Leader 回到 Replicas 列表中的第一个节点(通常是理想分布)。

kafka/bin/kafka-leader-election.sh --bootstrap-server 192.168.1.10:9092 \
  --election-type preferred \
  --all-topic-partitions
1
2
3

# 查看集群配置

kafka/bin/kafka-configs.sh --bootstrap-server 192.168.1.10:9092 \
  --entity-type brokers \
  --entity-name 0 \
  --describe
1
2
3
4

# 常见问题

# 常见错误排查

  • ZK 启动失败,日志报 Connection refused 或 **Session expired

    • 检查 myid 文件是否存在且内容正确。
    • 检查 zookeeper.properties 中 server.x 的 IP 是否可达,端口 2888/3888 是否被防火墙拦截。
  • Kafka 启动失败,日志报 **Broker ID 0 is already registered

    • 检查是否有残留的旧 ZK 数据。如果是新部署,清空 ZK 的 dataDir 和 Kafka 的 log.dirs 后重试。
    • 确认各节点 broker.id 不重复。
  • ISR 集合不全 (Isr: 0,1 而不是 0,1,2)**

    • 说明某个 Follower 同步太慢或网络不通。检查该节点的日志和网络带宽。
  • Kafka 启动失败,日志报 Failed to bind to port 9092

    • 检查 9092 端口是否被其他进程占用(使用 netstat -tulpn | grep 9092 或 lsof -i:9092 命令)。
    • 检查 server.properties 中 listeners 配置的端口是否与实际要使用的端口一致,避免配置冲突。
    • 确认防火墙未拦截 9092 端口,或已在防火墙规则中放行该端口。
  • Kafka 生产者发送消息失败,报 Leader not available

    • 检查对应的 Topic 是否存在,若不存在需先创建(kafka-topics.sh --create 命令)。
    • 检查 Kafka 集群是否正常启动,Broker 节点是否全部在线。
    • 检查 Topic 的副本配置是否合理,若副本数大于可用 Broker 数,会导致 Leader 无法选举。
  • Kafka 消费者消费失败,报 No offset found for partition

    • 若为新消费者组,首次消费无偏移量属于正常情况,可配置 auto.offset.reset 为 earliest 或 latest。
    • 若为旧消费者组,检查是否手动删除过 ZK/ Kafka 中的偏移量数据,或消费者组长时间未消费导致偏移量过期(需调整 offsets.retention.minutes 配置)。
  • ZK 集群启动后,部分节点无法加入集群,日志报 Cannot open channel to x at election address

    • 检查所有 ZK 节点的 zoo.cfg 中 server.x 配置的 IP / 端口是否一致,无拼写错误。
    • 检查各节点之间的时间是否同步(时间差超过 2000ms 会导致选举失败),可通过 ntpdate 同步时间。
    • 确认 dataDir 目录权限为 ZK 运行用户可读写,避免权限不足导致 myid 文件无法读取。
  • Kafka 日志报 Not enough replicas,消息无法持久化

    • 检查 Topic 的 min.insync.replicas 配置值是否大于当前可用的同步副本数(ISR)。
    • 若为单 Broker 部署,需将 min.insync.replicas 改为 1(默认 1,若手动修改过需调整)。
    • 检查 Follower 节点是否正常同步,排查网络延迟、磁盘 IO 过高导致同步慢的问题。
  • Kafka 启动后,JVM 内存溢出(OOM),进程直接崩溃

    • 检查 kafka-server-start.sh 中的 JVM 配置(KAFKA_HEAP_OPTS),默认堆内存可能过大 / 过小。
    • 单机测试环境可调整为 -Xmx512M -Xms512M,生产环境根据服务器内存配置(建议不超过物理内存的 50%)。
    • 检查是否开启了过多的 Topic / 分区,导致内存占用过高,需合理规划分区数。
  • 连接 Kafka 时报 SSL handshake failed

    • 若开启了 SSL 认证,检查客户端 / 服务端的 SSL 证书路径、密码配置是否正确。
    • 确认证书未过期,且服务端 listeners 配置为 SSL:// 而非 PLAINTEXT://。
    • 检查客户端是否正确配置了 SSL 相关参数(如 security.protocol=SSL)
编辑页面 (opens new window)
#用户手册

← 兼容版本说明文档 开发手册→

  • 浅色模式