Apusic文档中心
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
文档中心
  • 金蝶Apusic应用服务器

  • 金蝶Apusic负载均衡器

  • 金蝶Apusic分布式消息队列

    • 产品白皮书
    • 产品更新说明
    • V2.0.6

    • V2.0.6_for_kafka

    • V2.0.5

    • V2.0.4

    • V2.0.3

      • 发版说明
      • 产品简介
      • 功能清单
      • 快速入门
      • 安装部署
      • 开发手册
      • 用户手册
      • 国产化适配
      • 插件说明
      • 性能参数
      • 常见问题
  • 金蝶Apusic分布式缓存

  • 金蝶Apusic分布式配置中心

  • 金蝶Apusic Java开发工具包软件

  • 金蝶Apusic全文检索

开发手册

# 概述

本手册用于指导用户编写客户端接入 ADMQ 收发消息,客户端语言支持:Java、C#、Python 和 Go,其中 Java 为主要支持语言。下面针对每种语言的使用进行说明。

ADMQ 是基于 topic 的发布订阅模型,生产者客户端发送消息到某一个 topic 上,则所有订阅该 topic 的消费者组都能收到消息。生产者客户端和 ADMQ 通信过程主要包含以下流程:

  • 客户端连接到任意一个 ADMQ 计算节点服务
  • 发送请求获取 topic 所在的服务地址,并连接到该服务地址
  • 客户端发送消息
  • ADMQ 回复消息存储确认的通知
  • 客户端继续发送消息

消费者客户端和 ADMQ 通信过程主要包含:

  • 客户端连接到任意一个 ADMQ 计算节点服务
  • 发送请求获取 topic 所在的服务地址,并连接到该服务地址
  • 客户端发送订阅请求
  • ADMQ 发送消息
  • 客户端接收消息并返回消费确认
  • ADMQ 继续发送消息

手册包含三部分:安装部署、管控台使用以及客户端使用说明。

# 安装部署

参考文档

# 管控台使用说明

参考文档

# MQ 客户端使用说明

# 准备工作

ADMQ 部署时默认开启了权限认证,客户端接入 ADMQ 发送和接收消息之前首先需要在管控台上创建用户并配置资源权限。具体请参考:第四章 用户 和 第五章 资源管理

# Java 客户端

# 快速入门

创建maven项目,并引入依赖

<pulsar.version>2.9.1</pulsar.version>
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-admin</artifactId>
    <version>${pulsar.version}</version>
</dependency>
1
2
3
4
5
6

创建客户端

// 在管控台用户管理中可获取token内容
String token = "exaJe...";

// 集群的连接地址,可在集群信息中查看。通常为计算节点ip:6650,如果是多个节点则用逗号隔开
String service = "192.168.1.1:6650,192.168.1.2:6650";

ClientBuilder builder = PulsarClient.builder()
    .connectionTimeout(5, TimeUnit.MINUTES);
builder.authentication(AuthenticationFactory.token(token));
builder.serviceUrl("pulsar://" + service);
PulsarClient client = builder.build();
System.out.println("客户端创建成功");
1
2
3
4
5
6
7
8
9
10
11
12

创建消费者

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    // 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
    .topic("persistent://apusic/ns01/topic01")
    // 订阅名称
    .subscriptionName("sub-01")
    // 从最早的位置开始消费
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    // 订阅模式,包含共享、独占、灾备和按Key共享四种模式
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();
System.out.println("消费者创建成功");
1
2
3
4
5
6
7
8
9
10
11

创建生产者

Producer<String> producer = client.newProducer(Schema.STRING)
        // 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
        .topic("persistent://apusic/ns01/topic01")
        .create();
System.out.println("生产者创建成功");
1
2
3
4
5

生产消息

for (int i = 0; i < 10; i++) {
    String data = "admq-test-message " + i;
    MessageId id = producer.newMessage().value(data).send();
    System.out.println("send message: " + data + ", response id: " + id);
}
producer.close();
1
2
3
4
5
6

消费消息

for (int i = 0; i < 10; i++) {
    Message<String> msg = consumer.receive();
    System.out.println("receive message: " + msg.getValue() + ", msg id: " + msg.getMessageId());
    consumer.acknowledge(msg);
}
consumer.close();
client.close();
1
2
3
4
5
6
7

# Spring Boot Starter 接入

添加依赖

<dependency>
  <groupId>io.github.majusko</groupId>
  <artifactId>pulsar-java-spring-boot-starter</artifactId>
  <version>1.1.2</version>
</dependency>
1
2
3
4
5

添加配置

# MQ 服务接入地址
pulsar.service-url=pulsar://localhost:6650
# 命名空间名称
pulsar.namespace=default
# 租户名称
pulsar.tenant=public
# token
pulsar.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh
1
2
3
4
5
6
7
8

生产消息

生产者配置

@Configuration
public class ProducerConfiguration {

    @Bean
    public ProducerFactory producerFactory() {
        return new ProducerFactory()
            .addProducer("topic01", String.class);
    }
}
1
2
3
4
5
6
7
8
9

创建生产者

@Service
class MyProducer {

	@Autowired
	private PulsarTemplate<String> producer;

	void sendHelloWorld() throws PulsarClientException {
        // 此处的主题必须是上边已经注册过的
		producer.send("topic01", "Hello world!");
	}
}
1
2
3
4
5
6
7
8
9
10
11

消费消息

@Service
class MyConsumer {
    
    @PulsarConsumer(topic="topic01", clazz=String.class)
    void consume(String msg) {
    	System.out.println(msg);
    }
}
1
2
3
4
5
6
7
8

# 监听器方式消费消息

// 在管控台用户管理中可获取token内容
String token = "exaJe...";

// 集群的连接地址,可在集群信息中查看。通常为计算节点ip:6650,如果是多个节点则用逗号隔开
String service = "192.168.1.1:6650,192.168.1.2:6650";

ClientBuilder builder = PulsarClient.builder()
    .connectionTimeout(5, TimeUnit.MINUTES);
builder.authentication(AuthenticationFactory.token(token));
builder.serviceUrl("pulsar://" + service);
PulsarClient client = builder.build();
System.out.println("客户端创建成功");

client.newConsumer(Schema.STRING)
    .topic(topic)
    .subscriptionType(SubscriptionType.Shared)
    .subscriptionName(subName)
    .messageListener((c, msg) -> {
        try {
            System.out.println("receive message: " + msg.getValue() + ", msg id: " + msg.getMessageId());
            c.acknowledge(msg);
        } catch (Exception e) {
            log.error("", e);
        }
    })
    .subscribe();

Producer<String> producer = client.newProducer(Schema.STRING)
        // 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
        .topic("persistent://apusic/ns01/topic01")
        .create();
System.out.println("生产者创建成功");

for (int i = 0; i < 10; i++) {
    String data = "admq-test-message " + i;
    MessageId id = producer.newMessage().value(data).send();
    System.out.println("send message: " + data + ", response id: " + id);
}
producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 异步发送和接收

// 创建客户端、生产者和消费者

for (int i = 0; i < 10; i++) {
    String data = "admq-test-message " + i;
    producer.newMessage().value(data).sendAsync().whenCompleteAsync((msgId, ex) -> {
        if (ex != null) {
            // 发送失败,需要业务处理
        } else {
            System.out.println("send message: " + data + ", response id: " + msgId);
        }
    });
}

for (int i = 0; i < 10; i++) {
    consumer.receiveAsync().whenCompleteAsync((msg, ex) -> {
        if (ex != null) {
        } else {
            System.out.println("receive message: " + msg.getValue() + ", msg id: " + msg.getMessageId());
            try {
                consumer.acknowledge(msg);
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 延迟消息

支持指定消息在哪个时刻或者延迟多长时间后被消费者消费。

Producer<String> producer = client.newProducer(Schema.STRING)
                // 主题名称,格式为:persistent://租户名称/命名空间名称/主题名称
                .topic("persistent://apusic/ns01/topic01")
                .create();
System.out.println("生产者创建成功");

// 10秒后消费者才能收到消息
producer.newMessage().value("delay message").deliverAfter(10, TimeUnit.SECONDS).send();

// 消费者在指定时刻收到消息
long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2022-06-11 00:00:00").getTime();
producer.newMessage().value("delay message").deliverAt(time).send();
1
2
3
4
5
6
7
8
9
10
11
12

# 消息重试和死信队列

当消息没有被消费者成功消费时,会把消息保存到重试主题中,等待一段时间后会重新发送给消费者,当重试达到一定次数后把消息保存到死信主题中。

client.newConsumer(Schema.STRING)
    .topic(topic)
    .subscriptionName("sub01")
    .subscriptionType(SubscriptionType.Shared)
    // 支持重试
    .enableRetry(true)
    .receiverQueueSize(200)
    .deadLetterPolicy(DeadLetterPolicy.builder()
                      // 最大重试次数,超过后进入死信队列
                      .maxRedeliverCount(5)
                      // 指定消费失败的消息保存的主题名称。不指定的话使用默认的
                      .retryLetterTopic("persistent://apsuic/ns01/topic-retry")
                      // 指定死信队列的主题名称。不指定的话使用默认的
                      .deadLetterTopic("persistent://apusic/ns01/topic-dlq")
                      .build())
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    .subscribe();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# C# 客户端

添加依赖

引入NuGet程序包:DotPulsar

创建客户端连接

var token = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1xIn0.ybJge7zTfy_RDdAtB3w6nIPDHPT6-kbB6sNzgPt8sKQ";
var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar://172.20.140.23:6650"))
    .Authentication(AuthenticationFactory.Token(token))
    .RetryInterval(TimeSpan.FromSeconds(3))
    .Build();
1
2
3
4
5
6

启动消费者并消费消息

var consumer = client.NewConsumer()
    .Topic("persistent://public/default/topic-02")
    .SubscriptionName("sub01")
    .Create();

Console.WriteLine("start consumer ...");
await foreach (var message in consumer.Messages())
{
    Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
    // 确认消息
    await consumer.AcknowledgeCumulative(message);
}
1
2
3
4
5
6
7
8
9
10
11
12

启动生产者并发送消息

var producer = client.NewProducer()
    .Topic("persistent://public/default/topic-02")
    .Create();
for (int i = 0; i < 10; i++)
{
    var dataStr = "c# message test " + i;
    producer.Send(Encoding.UTF8.GetBytes(dataStr));
    Console.WriteLine("send message: " + dataStr);
    Thread.Sleep(1000);
}
1
2
3
4
5
6
7
8
9
10

# Go 客户端

添加依赖

github.com/apache/pulsar-client-go/pulsar

创建客户端连接

service := flag.String("service", "172.20.140.23:6650", "admq service address")
token := flag.String("token", "-", "token")
        
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:               "pulsar://" + *service,
    OperationTimeout:  60 * time.Second,
    ConnectionTimeout: 60 * time.Second,
    Authentication:    pulsar.NewAuthenticationToken(*token),
})
1
2
3
4
5
6
7
8
9

启动消费者并消费消息

func consume(client pulsar.Client, topic *string, subName *string) {
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:                       *topic,
		SubscriptionName:            *subName,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	ctx := context.Background()

	for i := 0; i < 10; i++ {
		msg, err := consumer.Receive(ctx)

		fmt.Println("Receive mesagge: ", msg.ID(), string(msg.Payload()))
		if err != nil {
			log.Fatal(err)
		}
		consumer.Ack(msg)
	}
	consumer.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

启动生产者并发送消息

func produce(client pulsar.Client, topic *string, message *string) {
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: *topic,
	})
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()

	for i := 0; i < 10; i++ {
		sendData := fmt.Sprintf("%s-%d", *message, i)
		if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
			Payload: []byte(sendData),
		}); err != nil {
			log.Fatal(err)
		} else {
			log.Println("Send message: ", msgId, sendData)
		}
	}

	if err != nil {
		log.Fatal(err)
	}
	producer.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# Python 客户端

引入依赖

pip3 install pulsar-client==2.9.1

创建客户端

token = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1xIn0.ybJge7zTfy_RDdAtB3w6nIPDHPT6-kbB6sNzgPt8sKQ";
client = pulsar.Client("pulsar://172.20.140.23:6650", authentication=pulsar.AuthenticationToken(token))
1
2

创建消费者

def consumer():
    consumer = client.subscribe('persistent://public/default/topic04', 'sub01')

    while True:
        msg = consumer.receive()
        try:
            print("receive message '{}' id='{}'".format(msg.data(), msg.message_id()))
            consumer.acknowledge(msg)
        except:
            # 会重新收到消息
            consumer.negative_acknowledge(msg)
1
2
3
4
5
6
7
8
9
10
11

创建生产者

def producer():
    producer = client.create_producer('persistent://public/default/topic04')

    for i in range(10):
        data = 'Hello-%d' % i
        producer.send(data.encode('utf-8'))
        print("send message: '{}'".format(data))
1
2
3
4
5
6
7
编辑页面 (opens new window)
#开发手册

← 安装部署 用户手册→

  • 浅色模式