开发手册
# ADMQ for Kafka开发者手册
# 核心架构
# 主题与分区
- Topic(主题): 消息的逻辑分类
- Partition(分区): Topic 的物理分片,实现并行读写与水平扩展
- 单个分区内的消息严格有序
- 不同分区之间无全局顺序保证
- 分区数应根据吞吐量需求设置,典型值为 10-50
# 副本机制
- Leader 副本: 处理所有读写请求
- Follower 副本: 仅从 Leader 同步数据,不处理客户端请求
- ISR(In-Sync Replicas): 与 Leader 保持同步的副本集合
- 只有 ISR 中的副本才有资格成为新 Leader
- 通过
replica.lag.time.max.ms控制同步超时(默认 30 秒)
# 高可用与故障转移
Controller 监控 Broker 状态
Leader 故障时,从 ISR 中选举新 Leader(优先选择偏移量最高的副本)
更新集群元数据,客户端自动重连新 Leader
默认禁止非 ISR 副本成为 Leader(
unclean.leader.election.enable=false)
# 安装部署
# 单机部署
解压安装包
mkdir /apusic
tar -zxf ADMQ-V2.0.391-Kafka-20260128.tar.gz -C /apusic
1
2
2
修改配置
cd /apusic/admq-kafka
vi config/kafka-standalone.conf
...
# 下面地址改成服务器实际 IP
listeners=PLAINTEXT://192.168.1.10:9092
...
1
2
3
4
5
6
2
3
4
5
6
启动
bin/admq-daemon start kafka zk
bin/admq-daemon start kafka standalone
1
2
2
# 客户端开发
# Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.1</version>
</dependency>
1
2
3
4
5
2
3
4
5
# 生产者(Producer)核心配置
| 配置项 | 推荐值 | 说明 |
|---|---|---|
bootstrap.servers | host1:9092,host2:9092 | 集群地址列表 |
acks | all | 等待所有 ISR 副本确认 |
enable.idempotence | true | 启用幂等性,防止重复 |
compression.type | lz4 或 zstd | 压缩算法 |
linger.ms | 5-10 | 批量发送延迟 |
batch.size | 16384 | 批次大小(字节) |
retries | Integer.MAX_VALUE | 重试次数 |
max.in.flight.requests.per.connection | 5 | 单连接未确认请求数(幂等性启用时 ≤ 5) |
# 生产者示例代码
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key1", "value1");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent to partition " + metadata.partition() +
", offset " + metadata.offset());
}
});
producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 消费者(Consumer)核心配置
| 配置项 | 推荐值 | 说明 |
|---|---|---|
bootstrap.servers | host1:9092,host2:9092 | 集群地址 |
group.id | 自定义 | 消费者组 ID |
enable.auto.commit | false | 禁用自动提交,手动控制 |
auto.offset.reset | earliest 或 latest | 无位移时策略 |
max.poll.records | 500 | 单次 poll 最大记录数 |
session.timeout.ms | 30000 | 会话超时 |
heartbeat.interval.ms | 10000 | 心跳间隔(应小于会话超时的 1/3) |
# 消费者示例代码
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
consumer.commitSync();
}
} finally {
consumer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 生产环境最佳实践
# Topic 设计
- 分区数计算:单分区吞吐 × 分区数 ≥ 目标吞吐
- 副本数:生产环境推荐 3 副本
- 清理策略:
cleanup.policy=delete:按时间/大小清理cleanup.policy=compact:日志压缩,保留最新 Key
# 性能调优
- 生产者侧:
- 启用压缩(
lz4或zstd) - 合理设置
linger.ms和batch.size - 启用幂等性和事务(Exactly-Once 语义)
- 启用压缩(
- 消费者侧:
- 调整
max.poll.records避免单次处理过慢 - 合理设置
fetch.min.bytes和fetch.max.wait.ms
- 调整
# 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
UnderReplicatedPartitions | 未充分同步的分区数 | > 0 |
OfflinePartitionsCount | 离线分区数 | > 0 |
RequestLatencyAvg | 平均请求延迟 | > 100ms |
MessagesInPerSec | 每秒消息数 | 根据业务设定 |
BytesInPerSec | 每秒写入字节数 | 根据业务设定 |
# 安全配置
# SASL/PLAIN 认证配置
# broker 配置
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
# JAAS 配置文件
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ACL 示例
# 授予 alice 对 topic my-topic 的读写权限
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:alice \
--operation Read --operation Write \
--topic my-topic
# 授予 alice 对消费者组 my-group 的操作权限
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:alice \
--operation Read --group my-group
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 常用命令
ADMQ for kafka提供原生命令,位于admq-kafka/kafka/bin/目录下
# Topic 管理
cd /apusic/admq-kafka/kafka/bin/
# 创建 topic
kafka-topics.sh --bootstrap-server localhost:9092 --create \
--topic my-topic --partitions 3 --replication-factor 3
# 列出所有 topic
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看 topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# 修改分区数(只能增加)
kafka-topics.sh --bootstrap-server localhost:9092 --alter \
--topic my-topic --partitions 6
# 删除 topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 消费者组管理
# 列出消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费者组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group
# 重置位移
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --reset-offsets --to-earliest \
--topic my-topic --execute
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# KRaft 模式相关
# 格式化存储目录(仅首次)
kafka-storage.sh format -t <cluster-id> -c config/kraft/server.properties
# 查看仲裁状态
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
# 动态添加控制器节点
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 add \
--controller-quorum-voter <node-id>@<host>:<port>
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 故障排查
# 常见问题
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 消费者频繁重平衡 | session.timeout.ms 过小或处理时间过长 | 增大会话超时,优化消费逻辑 |
| 消息丢失 | acks=1 或未启用幂等性 | 设置 acks=all,启用 enable.idempotence |
| 重复消费 | 位移提交失败 | 使用手动提交,处理重复逻辑 |
| 生产延迟高 | 网络或磁盘瓶颈 | 检查网络带宽,优化磁盘 I/O |
| 分区不可用 | Broker 故障或副本不足 | 检查 Broker 状态,确保 ISR 足够 |
# 日志位置
- Broker 日志:
logs/server.log - Controller 日志:
logs/controller.log - 状态变更日志:
logs/state-change.log
编辑页面 (opens new window)