Apusic文档中心
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
文档中心
  • 金蝶Apusic应用服务器

  • 金蝶Apusic负载均衡器

  • 金蝶Apusic分布式消息队列

    • 产品白皮书
    • 产品更新说明
    • V2.0.6

    • V2.0.6_for_kafka

      • 用户手册
      • 开发手册
    • V2.0.5

    • V2.0.4

    • V2.0.3

  • 金蝶Apusic分布式缓存

  • 金蝶Apusic分布式配置中心

  • 金蝶Apusic Java开发工具包软件

  • 金蝶Apusic全文检索

开发手册

# ADMQ for Kafka开发者手册

# 核心架构

# 主题与分区

  • Topic(主题): 消息的逻辑分类
  • Partition(分区): Topic 的物理分片,实现并行读写与水平扩展
    • 单个分区内的消息严格有序
    • 不同分区之间无全局顺序保证
    • 分区数应根据吞吐量需求设置,典型值为 10-50

# 副本机制

  • Leader 副本: 处理所有读写请求
  • Follower 副本: 仅从 Leader 同步数据,不处理客户端请求
  • ISR(In-Sync Replicas): 与 Leader 保持同步的副本集合
    • 只有 ISR 中的副本才有资格成为新 Leader
    • 通过 replica.lag.time.max.ms 控制同步超时(默认 30 秒)

# 高可用与故障转移

  • Controller 监控 Broker 状态

  • Leader 故障时,从 ISR 中选举新 Leader(优先选择偏移量最高的副本)

  • 更新集群元数据,客户端自动重连新 Leader

  • 默认禁止非 ISR 副本成为 Leader(unclean.leader.election.enable=false)

# 安装部署

# 单机部署

解压安装包

mkdir /apusic
tar -zxf ADMQ-V2.0.391-Kafka-20260128.tar.gz -C /apusic
1
2

修改配置

cd /apusic/admq-kafka
vi config/kafka-standalone.conf
...
# 下面地址改成服务器实际 IP 
listeners=PLAINTEXT://192.168.1.10:9092 
...
1
2
3
4
5
6

启动

bin/admq-daemon start kafka zk 
bin/admq-daemon start kafka standalone
1
2

# 客户端开发

# Maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.1</version>
</dependency>
1
2
3
4
5

# 生产者(Producer)核心配置

配置项 推荐值 说明
bootstrap.servers host1:9092,host2:9092 集群地址列表
acks all 等待所有 ISR 副本确认
enable.idempotence true 启用幂等性,防止重复
compression.type lz4 或 zstd 压缩算法
linger.ms 5-10 批量发送延迟
batch.size 16384 批次大小(字节)
retries Integer.MAX_VALUE 重试次数
max.in.flight.requests.per.connection 5 单连接未确认请求数(幂等性启用时 ≤ 5)

# 生产者示例代码

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key1", "value1");
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.println("Sent to partition " + metadata.partition() + 
                          ", offset " + metadata.offset());
    }
});

producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 消费者(Consumer)核心配置

配置项 推荐值 说明
bootstrap.servers host1:9092,host2:9092 集群地址
group.id 自定义 消费者组 ID
enable.auto.commit false 禁用自动提交,手动控制
auto.offset.reset earliest 或 latest 无位移时策略
max.poll.records 500 单次 poll 最大记录数
session.timeout.ms 30000 会话超时
heartbeat.interval.ms 10000 心跳间隔(应小于会话超时的 1/3)

# 消费者示例代码

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Received: " + record.value());
        }
        consumer.commitSync();
    }
} finally {
    consumer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 生产环境最佳实践

# Topic 设计

  • 分区数计算:单分区吞吐 × 分区数 ≥ 目标吞吐
  • 副本数:生产环境推荐 3 副本
  • 清理策略:
    • cleanup.policy=delete:按时间/大小清理
    • cleanup.policy=compact:日志压缩,保留最新 Key

# 性能调优

  • 生产者侧:
    • 启用压缩(lz4 或 zstd)
    • 合理设置 linger.ms 和 batch.size
    • 启用幂等性和事务(Exactly-Once 语义)
  • 消费者侧:
    • 调整 max.poll.records 避免单次处理过慢
    • 合理设置 fetch.min.bytes 和 fetch.max.wait.ms

# 监控指标

指标 说明 告警阈值
UnderReplicatedPartitions 未充分同步的分区数 > 0
OfflinePartitionsCount 离线分区数 > 0
RequestLatencyAvg 平均请求延迟 > 100ms
MessagesInPerSec 每秒消息数 根据业务设定
BytesInPerSec 每秒写入字节数 根据业务设定

# 安全配置

# SASL/PLAIN 认证配置

# broker 配置
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false

# JAAS 配置文件
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret"
   user_alice="alice-secret";
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# ACL 示例

# 授予 alice 对 topic my-topic 的读写权限
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:alice \
  --operation Read --operation Write \
  --topic my-topic

# 授予 alice 对消费者组 my-group 的操作权限
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:alice \
  --operation Read --group my-group
1
2
3
4
5
6
7
8
9
10

# 常用命令

ADMQ for kafka提供原生命令,位于admq-kafka/kafka/bin/目录下

# Topic 管理

cd /apusic/admq-kafka/kafka/bin/

# 创建 topic
kafka-topics.sh --bootstrap-server localhost:9092 --create \
  --topic my-topic --partitions 3 --replication-factor 3

# 列出所有 topic
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 查看 topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

# 修改分区数(只能增加)
kafka-topics.sh --bootstrap-server localhost:9092 --alter \
  --topic my-topic --partitions 6

# 删除 topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 消费者组管理

# 列出消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看消费者组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

# 重置位移
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --reset-offsets --to-earliest \
  --topic my-topic --execute
1
2
3
4
5
6
7
8
9
10
11

# KRaft 模式相关

# 格式化存储目录(仅首次)
kafka-storage.sh format -t <cluster-id> -c config/kraft/server.properties

# 查看仲裁状态
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

# 动态添加控制器节点
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 add \
  --controller-quorum-voter <node-id>@<host>:<port>
1
2
3
4
5
6
7
8
9

# 故障排查

# 常见问题

问题 可能原因 解决方案
消费者频繁重平衡 session.timeout.ms 过小或处理时间过长 增大会话超时,优化消费逻辑
消息丢失 acks=1 或未启用幂等性 设置 acks=all,启用 enable.idempotence
重复消费 位移提交失败 使用手动提交,处理重复逻辑
生产延迟高 网络或磁盘瓶颈 检查网络带宽,优化磁盘 I/O
分区不可用 Broker 故障或副本不足 检查 Broker 状态,确保 ISR 足够

# 日志位置

  • Broker 日志:logs/server.log
  • Controller 日志:logs/controller.log
  • 状态变更日志:logs/state-change.log
编辑页面 (opens new window)
#开发手册

← 用户手册 发版说明→

  • 浅色模式