开发手册
# 概述
本手册用于指导用户编写客户端接入 ADMQ 收发消息,客户端语言支持:Java、C#、Python 和 Go,其中 Java 为主要支持语言。下面针对每种语言的使用进行说明。
ADMQ 是基于 topic 的发布订阅模型,生产者客户端发送消息到某一个 topic 上,则所有订阅该 topic 的消费者组都能收到消息。生产者客户端和 ADMQ 通信过程主要包含以下流程:
- 客户端连接到任意一个 ADMQ 计算节点服务
- 发送请求获取 topic 所在的服务地址,并连接到该服务地址
- 客户端发送消息
- ADMQ 回复消息存储确认的通知
- 客户端继续发送消息
消费者客户端和 ADMQ 通信过程主要包含:
- 客户端连接到任意一个 ADMQ 计算节点服务
- 发送请求获取 topic 所在的服务地址,并连接到该服务地址
- 客户端发送订阅请求
- ADMQ 发送消息
- 客户端接收消息并返回消费确认
- ADMQ 继续发送消息
手册包含三部分:安装部署、管控台使用以及客户端使用说明。
# 安装部署
参考文档
# 管控台使用说明
参考文档
# MQ 客户端使用说明
# 准备工作
ADMQ 部署时默认开启了权限认证,客户端接入 ADMQ 发送和接收消息之前首先需要在管控台上创建用户并配置资源权限。具体请参考:第四章 用户 和 第五章 资源管理
# Java 客户端
# 快速入门
创建maven项目,并引入依赖
<pulsar.version>2.9.1</pulsar.version>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${pulsar.version}</version>
</dependency>
1
2
3
4
5
6
2
3
4
5
6
创建客户端
// 在管控台用户管理中可获取token内容
String token = "exaJe...";
// 集群的连接地址,可在集群信息中查看。通常为计算节点ip:6650,如果是多个节点则用逗号隔开
String service = "192.168.1.1:6650,192.168.1.2:6650";
ClientBuilder builder = PulsarClient.builder()
.connectionTimeout(5, TimeUnit.MINUTES);
builder.authentication(AuthenticationFactory.token(token));
builder.serviceUrl("pulsar://" + service);
PulsarClient client = builder.build();
System.out.println("客户端创建成功");
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
创建消费者
Consumer<String> consumer = client.newConsumer(Schema.STRING)
// 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
.topic("persistent://apusic/ns01/topic01")
// 订阅名称
.subscriptionName("sub-01")
// 从最早的位置开始消费
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
// 订阅模式,包含共享、独占、灾备和按Key共享四种模式
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
System.out.println("消费者创建成功");
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
创建生产者
Producer<String> producer = client.newProducer(Schema.STRING)
// 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
.topic("persistent://apusic/ns01/topic01")
.create();
System.out.println("生产者创建成功");
1
2
3
4
5
2
3
4
5
生产消息
for (int i = 0; i < 10; i++) {
String data = "admq-test-message " + i;
MessageId id = producer.newMessage().value(data).send();
System.out.println("send message: " + data + ", response id: " + id);
}
producer.close();
1
2
3
4
5
6
2
3
4
5
6
消费消息
for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive();
System.out.println("receive message: " + msg.getValue() + ", msg id: " + msg.getMessageId());
consumer.acknowledge(msg);
}
consumer.close();
client.close();
1
2
3
4
5
6
7
2
3
4
5
6
7
# Spring Boot Starter 接入
添加依赖
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.1.2</version>
</dependency>
1
2
3
4
5
2
3
4
5
添加配置
# MQ 服务接入地址
pulsar.service-url=pulsar://localhost:6650
# 命名空间名称
pulsar.namespace=default
# 租户名称
pulsar.tenant=public
# token
pulsar.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
生产消息
生产者配置
@Configuration
public class ProducerConfiguration {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("topic01", String.class);
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
创建生产者
@Service
class MyProducer {
@Autowired
private PulsarTemplate<String> producer;
void sendHelloWorld() throws PulsarClientException {
// 此处的主题必须是上边已经注册过的
producer.send("topic01", "Hello world!");
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
消费消息
@Service
class MyConsumer {
@PulsarConsumer(topic="topic01", clazz=String.class)
void consume(String msg) {
System.out.println(msg);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 监听器方式消费消息
// 在管控台用户管理中可获取token内容
String token = "exaJe...";
// 集群的连接地址,可在集群信息中查看。通常为计算节点ip:6650,如果是多个节点则用逗号隔开
String service = "192.168.1.1:6650,192.168.1.2:6650";
ClientBuilder builder = PulsarClient.builder()
.connectionTimeout(5, TimeUnit.MINUTES);
builder.authentication(AuthenticationFactory.token(token));
builder.serviceUrl("pulsar://" + service);
PulsarClient client = builder.build();
System.out.println("客户端创建成功");
client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
System.out.println("receive message: " + msg.getValue() + ", msg id: " + msg.getMessageId());
c.acknowledge(msg);
} catch (Exception e) {
log.error("", e);
}
})
.subscribe();
Producer<String> producer = client.newProducer(Schema.STRING)
// 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
.topic("persistent://apusic/ns01/topic01")
.create();
System.out.println("生产者创建成功");
for (int i = 0; i < 10; i++) {
String data = "admq-test-message " + i;
MessageId id = producer.newMessage().value(data).send();
System.out.println("send message: " + data + ", response id: " + id);
}
producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 异步发送和接收
// 创建客户端、生产者和消费者
for (int i = 0; i < 10; i++) {
String data = "admq-test-message " + i;
producer.newMessage().value(data).sendAsync().whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
// 发送失败,需要业务处理
} else {
System.out.println("send message: " + data + ", response id: " + msgId);
}
});
}
for (int i = 0; i < 10; i++) {
consumer.receiveAsync().whenCompleteAsync((msg, ex) -> {
if (ex != null) {
} else {
System.out.println("receive message: " + msg.getValue() + ", msg id: " + msg.getMessageId());
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 延迟消息
支持指定消息在哪个时刻或者延迟多长时间后被消费者消费。
Producer<String> producer = client.newProducer(Schema.STRING)
// 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
.topic("persistent://apusic/ns01/topic01")
.create();
System.out.println("生产者创建成功");
// 10秒后消费者才能收到消息
producer.newMessage().value("delay message").deliverAfter(10, TimeUnit.SECONDS).send();
// 消费者在指定时刻收到消息
long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2022-06-11 00:00:00").getTime();
producer.newMessage().value("delay message").deliverAt(time).send();
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 消息重试和死信队列
当消息没有被消费者成功消费时,会把消息保存到重试主题中,等待一段时间后会重新发送给消费者,当重试达到一定次数后把消息保存到死信主题中。
client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub01")
.subscriptionType(SubscriptionType.Shared)
// 支持重试
.enableRetry(true)
.receiverQueueSize(200)
.deadLetterPolicy(DeadLetterPolicy.builder()
// 最大重试次数,超过后进入死信队列
.maxRedeliverCount(5)
// 指定消费失败的消息保存的主题名称。不指定的话使用默认的
.retryLetterTopic("persistent://apsuic/ns01/topic-retry")
// 指定死信队列的主题名称。不指定的话使用默认的
.deadLetterTopic("persistent://apusic/ns01/topic-dlq")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# C# 客户端
添加依赖
引入NuGet程序包:DotPulsar
创建客户端连接
var token = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1xIn0.ybJge7zTfy_RDdAtB3w6nIPDHPT6-kbB6sNzgPt8sKQ";
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://172.20.140.23:6650"))
.Authentication(AuthenticationFactory.Token(token))
.RetryInterval(TimeSpan.FromSeconds(3))
.Build();
1
2
3
4
5
6
2
3
4
5
6
启动消费者并消费消息
var consumer = client.NewConsumer()
.Topic("persistent://public/default/topic-02")
.SubscriptionName("sub01")
.Create();
Console.WriteLine("start consumer ...");
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
// 确认消息
await consumer.AcknowledgeCumulative(message);
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
启动生产者并发送消息
var producer = client.NewProducer()
.Topic("persistent://public/default/topic-02")
.Create();
for (int i = 0; i < 10; i++)
{
var dataStr = "c# message test " + i;
producer.Send(Encoding.UTF8.GetBytes(dataStr));
Console.WriteLine("send message: " + dataStr);
Thread.Sleep(1000);
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# Go 客户端
添加依赖
github.com/apache/pulsar-client-go/pulsar
创建客户端连接
service := flag.String("service", "172.20.140.23:6650", "admq service address")
token := flag.String("token", "-", "token")
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://" + *service,
OperationTimeout: 60 * time.Second,
ConnectionTimeout: 60 * time.Second,
Authentication: pulsar.NewAuthenticationToken(*token),
})
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
启动消费者并消费消息
func consume(client pulsar.Client, topic *string, subName *string) {
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: *topic,
SubscriptionName: *subName,
SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
fmt.Println("Receive mesagge: ", msg.ID(), string(msg.Payload()))
if err != nil {
log.Fatal(err)
}
consumer.Ack(msg)
}
consumer.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
启动生产者并发送消息
func produce(client pulsar.Client, topic *string, message *string) {
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: *topic,
})
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
for i := 0; i < 10; i++ {
sendData := fmt.Sprintf("%s-%d", *message, i)
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(sendData),
}); err != nil {
log.Fatal(err)
} else {
log.Println("Send message: ", msgId, sendData)
}
}
if err != nil {
log.Fatal(err)
}
producer.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Python 客户端
引入依赖
pip3 install pulsar-client==2.9.1
创建客户端
token = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1xIn0.ybJge7zTfy_RDdAtB3w6nIPDHPT6-kbB6sNzgPt8sKQ";
client = pulsar.Client("pulsar://172.20.140.23:6650", authentication=pulsar.AuthenticationToken(token))
1
2
2
创建消费者
def consumer():
consumer = client.subscribe('persistent://public/default/topic04', 'sub01')
while True:
msg = consumer.receive()
try:
print("receive message '{}' id='{}'".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except:
# 会重新收到消息
consumer.negative_acknowledge(msg)
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
创建生产者
def producer():
producer = client.create_producer('persistent://public/default/topic04')
for i in range(10):
data = 'Hello-%d' % i
producer.send(data.encode('utf-8'))
print("send message: '{}'".format(data))
1
2
3
4
5
6
7
2
3
4
5
6
7
编辑页面 (opens new window)