用户手册
# Java客户端使用
你可以使用Java客户端来创建消息的Java生产者、消费者和读者,并执行管理任务。Java客户端的生产者、消费者和reader中的所有方法都是线程安全的。
客户端的Javadoc按包分为以下两个类型:
| Package | 描述 |
|---|---|
| org.apache.pulsar.client.api | 生产者和消费者API |
| org.apache.pulsar.client.admin | 管理API |
# 引入依赖
最新版本的Java客户端库可以通过Maven中心获得。要使用最新版本,请在构建配置中添加pulsar-client库。在pom.xml文件中添加以下信息。
<pulsar.version>2.9.3</pulsar.version>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
2
3
4
5
6
7
# 连接到ADMQ
要使用客户端库连接到服务器,你需要指定一个ADMQ协议URL。
你可以将ADMQ协议URL分配给特定的集群,并使用pulsar方案。默认的端口是6650,下面是一个localhost的例子。
pulsar://localhost:6650
如果你有多个broker,URL如下:
pulsar://localhost:6550,localhost:6651,localhost:6652
生产环境集群的URL如下:
pulsar://pulsar.cn-west.example.com:6650
如果你使用TLS认证,URL如下:
pulsar+ssl://pulsar.cn-west.example.com:6651
# 获取验证的Token
通过管理控制台创建用户,并获取Token。
# 创建客户端
你可以像这样只用一个集群的URL来实例化一个PulsarClient对象:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.authentication(AuthenticationFactory.token(".eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"))
.build();
2
3
4
如果你有多个broker,你可以像这样启动一个PulsarClient:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
.authentication(AuthenticationFactory.token(".eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"))
.build();
2
3
4
如果你创建一个客户端,可以使用loadConf配置。在loadConf中可以使用以下参数。
| 类型 | 名称 | 描述 | 缺省值 |
|---|---|---|---|
| String | serviceUrl | 服务的URL提供者 | |
| String | authPluginClassName | 认证插件的名称 | |
| String | authParams | String代表认证插件的参数 例子 key1:val1,key2:val2 | |
| long | operationTimeoutMs | 操作超时时间 | 30000 |
| Long | statsIntervalSeconds | 每条统计信息之间的间隔 统计信息以正数statsInterval激活 将 statsIntervalSeconds 设置为至少 1 秒 | 60 |
| Int | numIoThreads | 用于处理与broker连接的线程数量 | 1 |
| int | numListenerThreads | 用于处理消息监听器的线程数 | 1 |
| boolean | useTcpNoDelay | 是否在连接上使用TCP no-delay标志以禁用Nagle算法 | True |
| Boolean | useTls | 是否在连接上使用TLS加密 | False |
| String | tlsTrustCertsFilePath | 信任的TLS证书文件的路径 | |
| Boolean | tlsAllowInsecureConnection | 客户端是否接受来自broker的不受信任的TLS证书 | False |
| Boolean | tlsHostnameVerificationEnable | 是否启用TLS主机名验证 | False |
| Int | concurrentLookupRequest | 允许在每个broker连接上发送的并发查询请求的数量,以防止broker的过载 | 5000 |
| int | maxLookupRequest | 每个broker连接允许的最大查询请求数,以防止broker的过载 | 50000 |
| int | maxNumberOfRejectedRequestPerConnection | 在当前连接关闭后,客户端创建一个新的连接以连接到不同的broker后,在一定的时间范围内(30秒),一个broker的最大拒绝请求数 | 50 |
| int | keepAliveIntervalSeconds | 每个客户端连接的存活间隔秒数 | 30 |
| Int | connectionTimeoutMs | 等待与broker建立连接的持续时间 如果时间过了,没有来自broker的响应,连接尝试将被放弃 | 10000 |
| int | requestTimeoutMs | 完成一项请求的最长时间 | 60000 |
| int | defaultBackoffIntervalNanos | 默认的backoff间隔时间 | TimeUnit.MILLISECONDS.toNanos(100); |
| long | maxBackoffIntervalNanos | 一个backoff间隔的最大持续时间 | TimeUnit.SECONDS.toNanos(30) |
# 生产者
生产者向主题写入消息。一旦你实例化了一个PulsarClient对象,你就可以为一个特定的主题创建一个生产者。
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
producer.send("My message".getBytes());
2
3
4
5
默认情况下,生产者产生的消息是由字节数组组成的。你可以通过指定一个消息schema来产生不同的类型。
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
确保你在不需要时关闭你的生产者、消费者和客户端对象:
producer.close();
consumer.close();
client.close();
关闭操作也可以是异步的:
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"))
.exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex);
return null;
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 一些配置参数
如果你像上面的例子那样只指定一个主题名称来实例化一个生产者对象,则使用生产者的默认配置。
如果你创建一个生产者,可以使用loadConf配置。在loadConf中可以使用以下参数。
| 类型 | 名称 | 描述 | 缺省值 |
|---|---|---|---|
| String | topicName | 主题名称 | |
| String | producerName | 生产者名称 | |
| Long | sendTimeoutMs | 消息发送超时,单位是ms 如果在sendTimeout过期前,消息没有被服务器确认,就会发生一个错误。 | 30000 |
| Boolean | blockIfQueueFull | 如果它被设置为true,当传出的消息队列已满时,生产者的Send和SendAsync方法会被阻塞,而不是失败和抛出错误。 如果它被设置为false,当传出的消息队列已满时,生产者的Send和SendAsync方法会失败,并出现ProducerQueueIsFullError异常。 MaxPendingMessages参数决定了外发消息队列的大小。 | False |
| Int | maxPendingMessages | 持有未决消息的队列的最大大小。 例如,一个等待从broker收到确认的消息。 默认情况下,当队列已满时,所有对Send和SendAsync方法的调用都会失败,除非你把BlockIfQueueFull设置为true。 | 1000 |
| Int | maxPendingMessagesAcrossPartitions | 各个分区的最大待处理消息数。 如果总数超过了配置的值,使用设置来降低每个分区的最大待处理信息 | 50000 |
| MessageRoutingMode | messageRoutingMode | 分区主题上的生产者的消息路由逻辑。 只在对消息设置无键时应用该逻辑。 可用的选项如下: - pulsar.RoundRobinDistribution:轮询 - pulsar.UseSinglePartition:将所有消息发布到一个分区上 - pulsar.CustomPartition:一个自定义的分区方案 | RoundRobinDistribution |
| HashingScheme | hashingScheme | 确定你发布特定信息的分区的哈希函数(仅适用于分区的主题)。 可用的选项如下: - pulsar.JavaStringHash:相当于Java中的String.hashCode() - pulsar.Murmur3_32Hash:应用Murmur3的散列函数。 - pulsar.BoostHash:应用C++的Boost库中的散列函数。 | HashingScheme.JavaStringHash |
| ProducerCryptoFailureAction | cryptoFailureAction | 加密失败时,生产者应采取行动。 - FAIL:如果加密失败,未加密的信息发送失败。 - SEND:如果加密失败,未加密的信息被发送。 | ProducerCryptoFailureAction.FAIL |
| long | batchingMaxPublishDelayMicros | 分批发送信息的时间段 | TimeUnit.MILLISECONDS.toMicros(1) |
| int | batchingMaxMessages | 一个批次中允许的最大信息数量 | 1000 |
| Boolean | batchingEnabled | 启用信息批处理 | True |
| CompressionType | compressionType | 生产者使用的消息数据压缩类型。 可用的选项: LZ4 ZLIB ZSTD SNAPPY |
如果你不想使用默认配置,你可以配置参数。下面是一个例子:
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
2
3
4
5
6
# 消息路由
当使用分区主题时,你可以在使用生产者发布消息时指定路由模式。缺省的路由策略为轮询,下面是一个例子:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-namespace/my-topic";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode. RoundRobinPartition)
.create();
producer.send("Partitioned topic message".getBytes());
2
3
4
5
6
7
8
9
# 异步发送
你可以使用Java客户端异步地发布消息。通过异步发送,生产者将消息放在一个阻塞队列中并立即返回。然后客户端库在后台将消息发送到代理。如果队列已满(最大尺寸可配置),生产者在调用API时就会被阻塞或立即失败,这取决于传递给生产者的参数。
下面是一个例子:
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
2
3
4
正如你在上面的例子中看到的,异步发送操作返回一个包裹在CompletableFuture中的MessageId。
# 配置消息
除了值之外,你还可以在一个给定的信息上设置其他项目:
producer.newMessage()
.key("my-message-key")
.value("my-async-message".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
2
3
4
5
6
你可以用sendAsync()终止构建器链,并获得一个future。
# 消费者
在ADMQ中,消费者订阅主题并处理生产者向这些主题发布的消息。你可以通过首先实例化一个PulsarClient对象并将broker的URL传递给它来实例化一个新的消费者。
一旦你实例化了一个PulsarClient对象,你就可以通过指定主题和订阅来创建一个消费者。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
2
3
4
subscribe方法自动将消费者订阅到指定的主题和订阅。让消费者监听主题的一种方法是设置一个while循环。在这个例子的循环中,消费者监听消息,打印任何收到的消息的内容,然后确认消息已被处理。如果处理逻辑失败了,你可以使用否定的确认来在以后重新传递消息。
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 一些配置参数
如果你像上面的例子那样只指定主题和订阅名称来实例化一个消费者对象,那么消费者会使用默认配置。
当你创建一个消费者时,你可以使用loadConf配置。在loadConf中可以使用以下参数。
| 类型 | 名称 | 描述 | 缺省值 |
|---|---|---|---|
| Set<String> | topicNames | 主题名称 | Sets.newTreeSet() |
| Pattern | topicsPattern | 主题模式 | |
| String | subscriptionName | 订阅名称 | |
| SubscriptionType | subscriptionType | 订阅类型 有四种订阅类型可供选择: Exclusive Failover Shared Key_Shared | SubscriptionType.Exclusive |
| int | receiverQueueSize | 一个消费者的接收队列的大小。 例如,在应用程序调用Receive之前,消费者积累的消息数量。 一个高于默认值的值会增加消费者的吞吐量,尽管代价是更多的内存利用率。 | 1000 |
| long | acknowledgementsGroupTimeMicros | 对消费者的确认进行分组,并指定时间。 默认情况下,消费者使用100ms的分组时间来向经纪人发送确认。 设置一个0的分组时间可以立即发送确认信息。 较长的确认分组时间是更有效的,代价是失败后消息的重新传送会略有增加。 | TimeUnit.MILLISECONDS.toMicros(100) |
| Long | negativeAckRedeliveryDelayMicros | 在重新发送处理失败的消息之前所要等待的时间。 当应用程序使用Consumer#negativeAcknowledge(Message)时,失败的消息会在一个固定的超时后重新交付。 | TimeUnit.MINUTES.toMicros(1) |
| int | maxTotalReceiverQueueSizeAcrossPartitions | 各个分区的最大总接收队列大小。 如果总的接收队列大小超过这个值,这个设置会减少单个分区的接收队列大小。 | 50000 |
| String | consumerName | 消费者名称 | |
| long | ackTimeoutMillis | 未确认信息的超时 | 0 |
| Long | tickDurationMillis | ack-timeout重新交付的粒度。 使用较高的tickDurationMillis可以减少在设置ack-timeout为较大值(例如1小时)时跟踪消息的内存开销。 | 1000 |
| int | priorityLevel | 消费者的优先级,broker在共享订阅模式下调度消息时给予其更多的优先权。 broker遵循递减的优先级。例如,0=最大优先级,1,2,...。 在共享订阅模式下,如果最大优先级的消费者有许可证,broker首先将消息投递给他们。否则,broker会考虑下一个优先级的消费者。 | 0 |
| ConsumerCryptoFailureAction | cryptoFailureAction | 当消费者收到无法解密的消息时,应该采取行动。 - FAIL:这是默认的选项,在加密成功之前,消息都是失败的。 - DISCARD:默默地确认,不向应用程序传递消息。 - CONSUME:将加密的消息传递给应用程序。解密消息是应用程序的责任。 消息的解压缩失败。 如果消息包含批处理的消息,客户端就不能在批处理中检索单个消息。 交付的加密消息包含EncryptionContext,其中包含加密和压缩信息,应用程序可以用它来解密消耗的消息有效载荷。 | ConsumerCryptoFailureAction.FAIL |
| SortedMap<String, String> | properties | 这个消费者的一个名称或属性。 属性是附加到消费者身上的应用程序定义的元数据。 当获得一个主题的统计信息时,将这个元数据与消费者的统计信息联系起来,以便于识别。 | new TreeMap<>() |
| Boolean | readCompacted | 如果启用readCompacted,消费者会从一个压缩的主题中读取消息,而不是读取一个主题的全部消息积压。 消费者只看到压缩后的主题中每个键的最新值,直到到达主题消息中压缩积压的点。超过这一点,就像平常一样发送消息。 只在持久性话题的订阅上启用readCompacted,这些话题有一个活跃的消费者(比如失败或排他性订阅)。 试图在非持久性主题的订阅或共享订阅上启用它,会导致订阅调用抛出一个PulsarClientException。 | False |
| SubscriptionInitialPosition | subscriptionInitialPosition | 第一次订阅一个主题时,设置订阅的初始位置。 | SubscriptionInitialPosition.Latest |
| int | patternAutoDiscoveryPeriod | 当使用主题的消费者模式时,主题的自动发现周期。 默认值和最小值是1分钟。 | 1 |
| RegexSubscriptionMode | regexSubscriptionMode | 当使用正则表达式订阅一个主题时,你可以选择某种类型的主题。 - PersistentOnly:只订阅持久性话题。 - NonPersistentOnly:只订阅非持久性话题。 - AllTopics:同时订阅持久性和非持久性主题。 | RegexSubscriptionMode.PersistentOnly |
| DeadLetterPolicy | deadLetterPolicy | 消费者的死信策略。 在默认情况下,一些消息可能会被多次重发,甚至到了从未停止的程度。 通过使用死信机制,消息有最大重传次数。当超过最大重传次数时,消息被发送到死信主题并自动确认。 你可以通过设置deadLetterPolicy来启用死信机制 例如: client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build()) .subscribe(); 缺省的死信队列的名称为{TopicName}-{Subscription}-DLQ 可以设置一个自定义的死信主题名称 client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10) .deadLetterTopic("your-topic-name").build()) .subscribe(); 当指定死信策略而不指定ackTimeoutMillis时,你可以将ack超时设置为30000毫秒。 | |
| boolean | autoUpdatePartitions | 如果启用了autoUpdatePartitions,消费者会自动订阅分区增量。 注意:这只适用于分区的消费者。 | True |
| boolean | replicateSubscriptionState | 如果replicateSubscriptionState被启用,一个订阅状态会被复制到地理复制的集群中。 | false |
如果你不想使用默认的配置,你可以配置参数。下面是一个例子。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
2
3
4
5
6
7
# 异步接收消息
接收方法同步地接收消息(消费者进程被阻塞,直到有消息可用)。你也可以使用异步接收,一旦有新的消息可用,它就立即返回一个CompletableFuture对象。
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
异步接收操作返回一个包裹在CompletableFuture中的消息。
# 批量接收消息
使用batchReceive来接收每次调用的多个信息。下面是一个例子。
Messages messages = consumer.batchReceive();
for (Object message : messages) {
// do something
}
consumer.acknowledge(messages)
2
3
4
5
6
批量接收策略限制了单个批次中信息的数量和字节数。你可以指定一个超时来等待足够的消息。如果满足以下任何一个条件,批处理接收就完成了:足够的消息数量,消息的字节数,等待超时。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024 * 1024)
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();
2
3
4
5
6
7
8
9
10
默认的批量接收策略是:
BatchReceivePolicy.builder()
.maxNumMessage(-1)
.maxNumBytes(10 * 1024 * 1024)
.timeout(100, TimeUnit.MILLISECONDS)
.build();
2
3
4
5
# 多主题订阅
除了将消费者订阅到单一的主题外,你还可以使用多主题订阅同时订阅多个主题。要使用多主题订阅,你可以提供一个正则表达式(regex)或一个主题列表。如果你通过正则表达式选择主题,所有的主题必须是在同一个命名空间内。
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
在上面的例子中,消费者订阅的是符合主题名称模式的持久性主题。如果你想让消费者订阅所有符合主题名称模式的持久性和非持久性主题,请将 subscriptionTopicsMode 设置为
RegexSubscriptionMode.AllTopics。
Pattern pattern = Pattern.compile("public/default/.*");
pulsarClient.newConsumer()
.subscriptionName("my-sub")
.topicsPattern(pattern)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
2
3
4
5
6
7
8
默认情况下,消费者的 subscriptionTopicsMode 是 PersistentOnly。subscriptionTopicsMode的可用选项是PersistentOnly、NonPersistentOnly和AllTopics。
你也可以订阅一个明确的主题列表(如果你愿意,可以跨命名空间)。
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
// Alternatively:
Consumer multiTopicConsumer = consumerBuilder
.topic(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
你也可以使用subscribeAsync方法异步订阅多个主题,而不是使用同步订阅方法。下面是一个例子。
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(this::receiveMessageFromConsumer);
private void receiveMessageFromConsumer(Object consumer) {
((Consumer)consumer).receiveAsync().thenAccept(message -> {
// Do something with the received message
receiveMessageFromConsumer(consumer);
});
}
2
3
4
5
6
7
8
9
10
11
12
13
# 订阅模式
ADMQ有各种订阅模式,以配合不同的场景。一个主题可以有多个具有不同订阅模式的订阅。然而,一个订阅一次只能有一个订阅模式。
一个订阅与订阅名称相同,一次只能指定一个订阅模式。你不能改变订阅模式,除非这个订阅的所有现有消费者都是离线的。
不同的订阅模式有不同的消息分发模式。本节描述了订阅模式的区别以及如何使用它们。
为了更好地描述它们的区别,假设你有一个名为 "my-topic "的主题,生产者已经发布了10条消息。
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 独占模式
创建一个新的消费者,用Exclusive订阅模式订阅。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
2
3
4
5
只有第一个消费者被允许参与订阅,其他消费者会收到一个错误。第一个消费者收到所有的10条消息,消费的顺序与生产的顺序相同。
如果主题是一个分区的主题,第一个消费者会订阅所有分区的主题,其他消费者没有被分配到分区,并收到一个错误。
备注:独占模式下创建Topic的时候如果设置分区数为0,就可以实现队列的点对点模式功能。
# 灾备模式
创建新的消费者,用Failover订阅模式订阅。
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
2
3
4
5
6
7
8
9
10
多个消费者可以附加到同一个订阅上,但只有第一个消费者是活动的,其他的是备用的。当活跃的消费者断开连接时,消息将被派发到一个备用的消费者身上,然后这个备用的消费者就成为活跃的消费者。
# 共享模式
创建新的消费者,用共享订阅模式订阅。
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
2
3
4
5
6
7
8
9
10
11
在共享订阅模式下,多个消费者可以附加到同一个订阅上,信息在消费者之间以轮流分配的方式传递。
# 按Key分发模式
创建新的消费者并以Key_Shared订阅模式订阅。
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
2
3
4
5
6
7
8
9
10
11
12
Key_Shared订阅和Shared订阅一样,所有消费者都可以附加到同一个订阅。但它与Key_Shared订阅不同,具有相同key的消息只依次传递给一个消费者。默认情况下,我们不会事先知道哪些key将被分配给一个消费者,但一个key只会在同一时间被分配给一个消费者。
consumer1收到以下信息
("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
Consumer2收到以下信息
("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")
如果在生产者端启用了批处理,不同key的消息默认会被添加到一个批次中。broker将把批处理派发给消费者,所以默认的批处理机制可能会破坏Key_Shared订阅保证的消息分发语义。生产者需要使用KeyBasedBatcher。
Producer producer = client.newProducer()
.topic("my-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
2
3
4
或者生产者可以禁用批处理。
Producer producer = client.newProducer()
.topic("my-topic")
.enableBatching(false)
.create();
2
3
4
# Reader接口
通过Reader接口,客户端可以在一个主题中 "手动定位",从指定消息开始处理所有消息。Java的 API使你能够通过指定一个主题和一个MessageId来创建Reader对象,下面是一个例子。
byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
Message message = reader.readNext();
// Process message
}
2
3
4
5
6
7
8
9
10
11
在上面的例子中,为一个特定的主题和消息(通过ID)实例化了一个Reader对象;在消息被msgIdBytes识别后,Reader对主题中的每条消息进行迭代。上面的示例代码展示了将Reader对象指向一个特定的消息(通过ID),但你也可以使用MessageId.earliest指向主题上最早的可用消息,MessageId.latest指向最近的可用消息。
当你创建一个Reader时,你可以使用loadConf配置。在loadConf中可以使用以下参数。
| 类型 | 名称 | 描述 | 缺省值 |
|---|---|---|---|
| String | topicName | 主题名称 | |
| Int | receiverQueueSize | 一个消费者的接收队列的大小。 例如,在应用程序调用Receive之前,消费者可以积累的消息数量。 一个高于默认值的值会增加消费者的吞吐量,尽管以更多的内存利用率为代价。 | 1000 |
| ReaderListener<T> | readerListener | 一个监听器,在收到消息时被调用。 | |
| String | readerName | Reader 名称 | |
| String | subscriptionRolePrefix | 订阅角色前缀 | |
| CryptoKeyReader | CryptoKeyReader | 抽象出对密钥存储的访问的接口。 | |
| ConsumerCryptoFailureAction | cryptoFailureAction | 当消费者收到无法解密的消息时,应该采取行动。 - FAIL:这是默认选项,使消息失败,直到加密成功。 - DISCARD:默默地确认,不向应用程序传递消息。 - CONSUME:将加密的消息传递给应用程序。解密消息是应用程序的责任。 消息解压失败。 如果消息包含批处理的消息,客户端就不能在批处理中检索单个消息。 交付的加密消息包含{@link EncryptionContext},其中包含加密和压缩信息,应用程序可以用它来解密消耗的消息有效载荷。 | ConsumerCryptoFailureAction.FAIL |
| boolean | readCompacted | 如果启用readCompacted,消费者会从一个压缩的主题中读取消息,而不是一个主题的全部消息积压。 消费者只看到压缩后的主题中每个键的最新值,直到到达主题消息中压缩积压的点。超过这个点,就像正常一样发送消息。 readCompacted只能在持久性话题的订阅上启用,这些订阅有一个活跃的消费者(例如,失败或排他性订阅)。 试图在非持久性主题的订阅或共享订阅上启用它,会导致订阅调用抛出一个PulsarClientException。 | False |
| boolean | resetIncludeHead | 如果设置为 "true",将返回的第一条信息是由messageId指定的那条。 如果设置为false,要返回的第一条信息是messageId指定的信息旁边的那条。 | false |
# Sticky key 范围 Reader
在Sticky key范围Reader中,broker将只发送消息key的哈希值包含在指定的key哈希范围内的消息。一个Reader上可以指定多个key哈希值范围。下面是一个创建sticky key范围Reader的例子。
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
.create();
2
3
4
5
散列范围总大小为65536,所以范围的最大端应该小于或等于65535。
# Schema
在ADMQ中,所有的消息数据都是由字节数组组成的。消息Schema使你在构建和处理消息时能够使用其他类型的数据(从简单的类型如字符串到更复杂的特定应用类型)。如果你构建了一个生产者,如果没有指定schema,那么这个生产者只能产生byte[]类型的消息。下面是一个例子。
Producer <byte[]> producer = client.newProducer()
.topic(topic)
.create();
2
3
如果你想对不同类型的数据使用生产者,你需要指定一个schema,告知ADMQ哪种数据类型将在主题中传输。
# Schema示例
假设你有一个SensorReading类,你想通过一个主题来传输:
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
然后你可以创建一个Producer<SensorReading>(或Consumer<SensorReading>):
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
2
3
以下是目前可用于Java的schema:
无schema或字节数组schema
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES) .topic("some-raw-bytes-topic") .create(); // 或者等效于: Producer<byte[]> bytesProducer = client.newProducer() .topic("some-raw-bytes-topic") .create();1
2
3
4
5
6
7对于UTF-8编码的字符串数据,使用Schema.STRING
Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("some-string-topic") .create();1
2
3使用Schema.JSON为POJO创建JSON schema
Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class)) .topic("some-pojo-topic") .create();1
2
3使用Schema.PROTOBUF生成Protobuf schema。下面的例子显示了如何创建Protobuf schema并使用它来实例化一个新的生产者
Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class)) .topic("some-protobuf-topic") .create();1
2
3用Schema.AVRO定义Avro schema。下面的代码片断演示了如何创建和使用Avro schema
Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class)) .topic("some-avro-topic") .create();1
2
3
# 管控台使用
# 第1章 登录管理控制台
金蝶Apusic分布式消息队列管理控制台管理控制台是一个基于Web的GUI管理和监控工具,用于管理用户、权限、租户、命名空间、主题、订阅、broker、部署集群。
1、在浏览器中键入URL:
https://ip:12306或者http://ip:12305
2、在登录界面输入用户名(admq),密码(11111111)进行登录。

3、首次登录需要修改密码。

4、点击【点击前往】按钮,进入首次登录修改密码页面。

当前密码:输入当前密码
新密码:输入需要设置的新密码
确认密码:再次输入需要设置的新密码(两次密码需统一)
备注:ADMQ管理控制台集成了AUC和AMP,若想对集群节点进行监控,需通过AUC登录后注册服务跳转到ADMQ管控台界面后进行集群部署和监控运维。具体详见第10章监控运维。
# 第2章 系统配置
# 2.1 服务器管理
服务器管理是用于新增、编辑、删除、查询服务器信息以及部署采集器,作为创建集群信息时需被引用的基础信息。

# 2.1.1 新增服务器信息
进入【系统配置】>【服务器管理】,在服务器管理页面点击左上角【新增】按钮,新增服务器信息。

机房:服务器所在的机房名称,自定义
管理IP:服务器IP地址
ssh端口:服务器的端口,如22
账号:服务器的用户命,如root
密码:服务器的密码(如果不填则需要用户设置免密登录)
程序目录:服务器的集群节点信息存储目录
数据目录:服务器的集群节点信息数据存储目录
描述:服务器的详细信息
# 2.1.2 编辑服务器信息
进入【系统配置】>【服务器管理】,在服务器管理页面选择需要编辑的服务器,点击【编辑】按钮,编辑服务器信息

机房:服务器所在的机房名称,不可修改
管理IP:服务器IP地址,不可修改
ssh端口:服务器的端口,如22,可修改
账号:服务器的用户命,如root,可修改
密码:服务器的密码,可修改
程序目录:服务器的集群节点信息存储目录,不可修改
数据目录:服务器的集群节点信息数据存储目录,不可修改
描述:服务器的详细信息,可修改
# 2.1.3 删除服务器信息
进入【系统配置】>【服务器管理】,进入服务器管理页面,选择需要删除的服务器信息,点击【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,删除服务器信息。注意:只有未被集群引用的服务器信息可以被删除,已被集群引用的服务器信息不可以被删除。

# 2.1.4 查询服务器信息
进入【系统配置】>【服务器管理】,进入服务器管理页面,模糊或精确输入管理IP,可查询服务器信息。

# 2.1.5 部署采集器
进入【系统配置】>【服务器管理】,进入服务器管理页面,选择需要部署采集器的服务器信息,点击【部署采集器】按钮,会弹出部署采集器弹框

机房:所选服务器所在的机房,不可变
管理IP:所选服务器的管理IP,不可变
芯片架构:下拉框,四种选项:arm、x86、risc-v、mips
采集器:下拉框,上传的软件包
备注:部署采集器前需要先上传采集器软件包,上传过程详见2.3.1。
# 2.2 License管理
License管理是用于导入、导出、查看、删除License信息,作为创建集群添加节点时需被引用的基础信息。

# 2.2.1 导入License信息
进入【系统配置】>【License管理】,点击左上角的【导入】按钮,点击【文件上传】按钮或拖动license文件进来,可导入License信息。注意:只能导入xml文件类型的license,导入license信息,最大计算节点数和最大存储节点数将会进行累加。

# 2.2.2 导出License信息
进入【系统配置】>【License管理】,选择需要导出的license点击【导出】按钮,可导出License信息。

备注:导出的License名称为导入时的license名称。
# 2.2.3 删除License信息
进入【系统配置】>【License管理】,选择需要删除的license,点击【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除License信息。注意:license至少要保留一条,不可全部删除,当只剩1个license时,删除按钮将会置灰显示,删除license后,对应可启动的broker数和可启动的bookkeeper数会对应减少数量。

# 2.2.4 查看License信息
进入【系统配置】>【License管理】,进入License管理页面,选择需要查看的License信息,点击【查看】按钮,查看License信息

# 2.3 软件包管理
软件包管理是用于上传、删除软件包信息,作为新建集群、部署采集器时需被引用的基础信息。

# 2.3.1 上传软件包信息
进入【系统配置】>【软件包管理】,点击左上角的【上传】按钮,点击【上传】按钮或拖动软件包文件进来,可上传软件包信息。

类型:ADMQ、RocketMQ、主机采集器,单选
备注:可上传.tar.gz或者.zip的文件,软件包名称格式:xxxx-V版本号(数字和小数点)-芯片架构.tar.gz(例如:admq-V1.0.0-x86.tar.gz )。
# 2.3.2 删除软件包信息
进入【系统配置】>【软件包管理】,选择需要删除的的软件包,点击【删除】按钮,在弹出的确认删除弹框中点击【确认】按钮,可删除软件包信息。注意:当有集群信息引用此软件包或者服务器信息引用此软件包时,软件包不可进行删除,若要进行删除,需要先删除引用此软件包的集群信息或者服务器信息。

# 2.4 默认配置
默认配置信息是为了更方便的管理集群的默认配置文件,可新增默认配置信息、编辑默认配置信息、删除默认配置信息、更新默认配置信息、启用默认配置信息、停用默认配置信息、下载默认配置文件、查询配置信息。
# 2.4.1 新增配置信息
进入【系统配置】->【默认配置】,在默认配置页面,点击左上角【新增】按钮,可新增默认配置信息。注意:新增默认配置信息需要更新配置文件才会更新到物理配置文件,部署集群的时候节点配置才能复制参数。

软件包:软件包的名称,根据选择显示,不可修改,
配置文件:配置项所在配置文件名称,根据选择显示,不可修改
配置名称:配置项的名称,自定义,唯一标识
配置内容:配置项的值,自定义
描述:配置项的描述信息,自定义
# 2.4.2 修改配置信息
进入【系统配置】->【默认配置】,在默认配置页面,选择需要修改的配置,点击【编辑】按钮,可修改默认配置信息。注意:修改默认配置信息需要更新配置文件才会更新到物理配置文件,部署集群的时候节点配置才能复制参数。

软件包:软件包的名称,不可修改
配置文件:配置项所在配置文件名称,不可修改
配置名称:配置项的名称,可修改
配置内容:配置项的值,可修改
描述:配置项的描述信息,可修改
# 2.4.3 删除配置信息
进入【系统配置】->【默认配置】,在默认配置页面,选择需要删除的配置,点击【删除】按钮,在确认删除弹框中,点击【确定】按钮,可删除默认配置信息。注意:删除默认配置信息需要更新配置文件才会更新到物理配置文件,部署集群的时候节点配置才能生效;删除配置项需谨慎,避免因删除了重要的配置项而出现集群服务起不来或服务不可用。

# 2.4.4 启用配置参数
进入【系统配置】->【默认配置】,在默认配置页面,选择需要启用的配置(配置已停用),点击【启用】按钮,可启用默认配置信息。注意:启用默认配置信息需要更新配置文件才会更新到物理配置文件,部署集群的时候节点配置才能生效。

# 2.4.5 停用配置信息
进入【系统配置】->【默认配置】,在默认配置页面,选择需要停用的配置(配置已启用),点击【停用】按钮,可停用默认配置信息。注意:停用默认配置信息需要更新配置文件才会更新到物理配置文件,部署集群的时候节点配置才能生效(停用某些默认配置并且已更新配置文件,就表示配置文件中已删除这些配置);停用配置项需谨慎,避免因停用了重要的配置项而出现集群服务起不来或服务不可用。

# 2.4.6 下载配置文件
进入【系统配置】->【默认配置】,在默认配置页面,右上角选择需要下载的配置文件名称,点击【下载】按钮,可下载默认配置文件。

# 2.4.7 查询配置项信息
进入【系统配置】->【默认配置】,在默认配置页面,右上角选择软件包名称、配置文件名称,精确或模糊输入配置名称、配置内容,可查询默认配置信息。备注:配置名称、配置内容两者选其一也可进行查询。

# 2.5 操作日志
操作日志用于查询用户的操作纪录信息。

# 2.5.1 操作日志查询
进入【系统配置】>【操作日志】,进入操作日志页面,下拉选择模块、输入用户名称,选择查询开始日期和结束日期,可查询操作日志信息。

备注:超级管理员用户可查询到所有用户的操作日志信息,普通用户只能查询自己的操作日志信息。
# 2.6 事件管理
管理控制台支持对事件进行管理,可跟踪资源等系统事件情况。事件管理可查看事件详情信息、可查询事件信息。

# 2.6.1 查看事件详细信息
进入【系统配置】>【事件管理】,进入事件管理页面,选择要查看详情的事件,点击【查看】按钮,可查看事件详情信息。


# 2.6.2 查询事件信息
进入【系统配置】>【事件管理】,进入事件管理页面,下拉选择事件、输入用户名称,选择查询开始日期和结束日期,可查询事件信息。

备注:超级管理员用户可查询到所有用户的操作日志信息,普通用户只能查询自己的操作日志信息。
# 第3章 集群
# 3.1 集群管理
集群管理用于部署管理集群,有新增集群信息、配置集群信息、删除集群信息、查询集群详细信息、查看令牌、证书管理、依赖管理、插件管理;部署节点、启动节点、重启节点、停止节点、配置节点参数、删除节点、编辑节点、节点资源监控;一键部署、一键启动、一键停止。

# 3.1.1 新增集群信息
进入【集群管理】,在集群管理页面左上角点击【新增】按钮,新增集群信息。


集群名称:自定义,唯一标识
集群类型:单项选项,ADMQ、RocketMQ,可部署ADMQ集群或者RocketMQ集群
部署模式:单项选项,集群部署、单机部署
软件包:集群使用的软件包,下拉选择已上传的软件包(详见2.3软件包管理)
本地协调器服务器:集群的本地协调器地址,下拉选择已经新增的服务器(详见2.1服务器管理)
全局协调器服务器:集群的全局协调器地址,下拉选择已经新增的服务器(详见2.1服务器管理),非必填
存储程序服务器:集群的存储程序地址,下拉选择已经新增的服务器(详见2.1服务器管理);若选择部署模式为单机,则默认为本地协调器选择的地址,不可更改
计算程序服务器:集群的计算程序地址,下拉选择已经新增的服务器(详见2.1服务器管理);若选择部署模式为单机,则默认为本地协调器选择的地址,不可更改
密钥文件:生成用户token的文件,与集群进行绑定。有默认密钥和动态生成密钥,默认密钥则是软件包默认带的密钥文件,动态生成密钥则是随机生成token.key,每个集群的token.key都不一样。默认动态生成秘钥文件。
备注:新增集群信息前,需要先创建一些基础信息,如上传软件包、新增服务器信息等。
# 3.1.2 配置集群信息
进入【集群管理】,在集群管理页面选择需要配置的集群信息,点击【配置】按钮,配置集群信息。

# 3.1.2.1 ADMQ集群配置
ADMQ集群的配置页面,包括:基础配置、资源配置、安全配置、其它配置

# 3.1.2.1.1 基础配置
在集群配置页面的基础配置部分,点击右边的编辑按钮,可配置基础信息


集群名称:ADMQ集群的唯一标识,不可变
数据接入地址:集群计算程序服务地址,系统将会根据选择的计算程序地址默认填写,如pulsar://172.24.4.104:6650,172.24.4.105:6650,172.24.4.106:6650,也可手动更改
管理服务地址:admq集群的web地址,系统将会根据选择的计算程序地址默认填写,如http://172.24.1.116:8080,172.24.1.118:8080,172.24.4.130:8080,也可手动更改(但若填写错误,则无法跟broker进行交互)
KAFKA服务地址:kafka插件的服务地址,不显示,若加载插件后该部分会自动去除
MQTT服务地址:mqtt插件的服务地址,不显示,若加载插件后该部分会自动去除
RABBITMQ服务地址:rabbitmq插件的服务地址,不显示,若加载插件后该部分会自动去除
ROCKETMQ服务地址:rocketmq插件的服务地址,不显示,若加载插件后该部分会自动去除
描述:集群的详细信息介绍
# 3.1.2.1.2 资源配置
在集群配置页面的资源配置部分,点击右边的编辑按钮,可对集群的资源进行配置


最大租户数量:集群下可创建的最大租户数量
最大命名空间数量:集群下可创建的最大命名空间数量
最大主题数量:集群下可创建的最大主题数量
自动创建主题:单选框,默认允许
自动删除无效主题:单选框,默认不允许
# 3.1.2.1.3 安全配置
通过添加客户端黑、白名单地址,来限制客户端使用情况,该功能可选择开启或不开启。
在集群配置页面的安全配置部分,点击右边的编辑按钮,可对集群进行安全配置。


限制客户端地址:单选框,不开启、开启,默认不开启,开启后只有客户端白名单才可成功访问ADMQ集群服务,黑名单不可访问。
客户端白名单:可访问集群服务的客户端IP地址,IP格式:单个IP(如:192.168.1.1),多个IP之间以逗号隔开(如:192,168.1.1,,192.168.1.2或者192.168.1.[1-255],192.168.1.1)
客户端黑名单:不可访问集群服务的客户端IP地址,IP格式如客户端白名单的IP格式
# 3.1.2.1.4 其他配置
在集群配置页面的其他配置部分,点击右边的编辑按钮,可对集群的其他信息进行配置


事务功能:单选框,开启、不开启,默认不开启
Journal日志存储目录:日志的存储目录,可修改
数据存储目录:数据的存储目录,可修改
# 3.1.2.2 RocketMQ的配置页面
RocketMQ集群的配置页面,包括:基础配置、安全配置、存储设置

# 3.1.2.2.1 基础配置
在集群配置页面的基础配置部分,点击右边的编辑按钮,可配置基础信息


集群名称:ROcketMQ集群的唯一标识,不可变
数据接入地址:集群计算程序服务地址,可编辑
描述:集群详细信息介绍,可编辑
# 3.1.2.2.2 安全配置
在集群配置页面的安全配置部分,点击右边的编辑按钮,可配置安全信息


自动创建主题:单选框,允许、不允许,默认不允许
开启认证:单选框,开启、不开启,默认开启
自动创建订阅组:单选框,允许、不允许,默认不允许
客户端白名单:客户端白名单IP,可编辑
# 3.1.2.2.3 存储设置
在集群配置页面的存储设置部分,点击右边的编辑按钮,可配置存储信息


文件删除时间:每天固定整点删除文件,默认是1,可修改
磁盘使用上限:文件存储占总磁盘的百分比,默认85,可修改
文件保留时间(小时):文件保留在磁盘的时间,默认72小时,可修改
# 3.1.3 删除集群信息
进入【集群管理】,在集群管理页面选择需要删除的集群信息,点击【更多】-【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,删除集群信息。注意:集群有节点启动不可删除,需停止所有节点,才可删除;集群删除后,集群下所有的资源信息(租户、命名空间、主题)也被删除。


# 3.1.4 查询集群详细信息
集群详情用户监控集群的一些信息,如基本信息、配置信息、统计信息、节点信息等,进入【集群管理】,在集群管理页面,选择需要查看集群详细信息的集群,双击集群名称,进入集群管理详情页面。

# 3.1.5 查看令牌
进入【集群管理】,在集群管理页面,选择需要查看令牌的集群,点击【令牌】按钮,可查看令牌。

# 3.1.6 证书管理
管理控制台支持对证书进行管理,默认证书为admq软件包下的证书,证书管理可替换证书、下发证书、编辑证书、下载证书。

# 3.1.6.1 替换证书
进入集群管理,选择需要管理证书的集群,点击【更多】按钮->【证书】按钮,在证书页面,选择需替换的证书,点击【替换】按钮后上传或拖曳证书,可以替换证书。注意:替换的证书需要下发才能下发到集群各个节点下,且下发后应重启集群节点。


证书类型:只能上传.pem和p12文件的证书
备注:替换的证书,只是内容被替换,证书名称不会被修改。
# 3.1.6.2 下发证书
进入集群管理,选择需要管理证书的集群,点击【更多】按钮->【证书】按钮,在证书页面,选择需下发的证书,点击【下发】按钮,可下发证书到已部署节点服务器上。注意:下发证书应成功部署集群后才能下发,下发后需生效要重启集群节点。


# 3.1.6.3 下载证书
进入集群管理,选择需要管理证书的集群,点击【更多】按钮->【证书】按钮,在证书页面,选择需下载的证书,点击【下发】按钮,可下发证书到已部署节点服务器上。注意:下载证书是下载管理控制台上的证书(非节点服务器上的证书)

# 3.1.6.4 编辑证书
进入集群管理,选择需要管理证书的集群,点击【更多】按钮->【证书】按钮,在证书页面,选择需编辑的证书,点击【编辑】按钮,可编辑证书信息。


描述:证书描述信息
# 3.1.7 依赖管理
为方便对已部署的集群进行升级,管理控制台支持对依赖进行管理,应用场景主要用于核心引擎服务(软件包)升级。依赖管理可新增依赖、下发依赖、替换依赖、编辑依赖、下载依赖、删除依赖。

# 3.1.7.1 新增依赖
进入集群管理,选择需要管理依赖的集群,点击【更多】按钮->【依赖】按钮,在依赖页面,点击左上角【新增】按钮,可新增依赖信息。注意:新增的依赖需要下发才能更新到集群各个节点下,且下发后应重启集群节点,才会生效。


节点类型:节点的类型,下拉选项。若是单机,有单机程序;若是集群,有协调器、全局协调器、存储程序、计算程序
文件目录:依赖的上级目录,默认为软件包下的目录,有bin、protocols、config、libs、licenses
选择文件:要上传的依赖,可拖拽
描述:依赖文件的描述信息
# 3.1.7.2 替换依赖
进入集群管理,选择需要管理依赖的集群,点击【更多】按钮->【依赖】按钮,在依赖页面,选择需替换的依赖,点击【替换】按钮后上传或拖拽依赖,可以替换依赖。注意:替换的依赖需要下发才能更新到集群各个节点下,且下发后应重启集群节点,才会生效。


# 3.1.7.3 下发依赖
进入集群管理,选择需要管理依赖的集群,点击【更多】按钮->【依赖】按钮,在依赖页面,选择需下发的依赖,点击【下发】按钮,可下发依赖到已部署节点服务器上,下发成功后,状态会从未生效变成已生效。注意:成功部署集群后才能下发依赖,下发后应重启集群节点,才会生效。


# 3.1.7.4 下载依赖
进入集群管理,选择需要管理依赖的集群,点击【更多】按钮->【依赖】按钮,在依赖页面,选择需下载的依赖,点击【更多】按钮->【下载】按钮,可下载依赖。注意:下载依赖是下载管理控制台上的依赖(非节点服务器上的依赖)


# 3.1.7.5 编辑依赖
进入集群管理,选择需要管理依赖的集群,点击【更多】按钮->【依赖】按钮,在依赖页面,选择需编辑的依赖,点击【编辑】按钮,可编辑依赖信息。


描述:依赖文件的描述信息
# 3.1.7.6 删除依赖
进入集群管理,选择需要管理依赖的集群,点击【更多】按钮->【依赖】按钮,在依赖页面,选择需编辑的依赖,点击【更多】按钮->【删除】按钮,可删除依赖信息。注意:删除依赖,只是删除管理控制台的依赖文件,不能删除节点服务器上的文件。



# 3.1.8 插件管理
为方便对已部署的集群进行插件配置,管理控制台支持对ADMQ内置的插件进行管理,应用场景主要用于启动ROCKETMQ服务、KAFKA服务、MQTT服务、RABBITMQ服务。插件管理包括:加载插件、替换插件、下发插件、编辑插件。

# 3.1.8.1 加载插件
进入集群管理,选择需要加载插件服务的集群,点击【更多】按钮->【插件】按钮,在插件页面,选择需加载的插件,点击【加载】按钮后即可加载插件。注意:加载插件后应重启集群节点后才会生效。


备注:插件只有被加载后才可进行替换和下发,四种插件可同时被加载。
# 3.1.8.2 替换插件
进入集群管理,选择需要替换插件的集群,点击【更多】按钮->【插件】按钮,在插件页面,选择需替换的插件,点击【更多】按钮->【替换】按钮后上传或拖拽插件,可以替换插件。注意:替换的插件需要下发才能更新到集群各个节点下,且下发后应重启集群节点,才会生效。


# 3.1.8.3 下发插件
进入集群管理,选择需要下发插件的集群,点击【更多】按钮->【插件】按钮,在插件页面,选择需下发的插件,点击【下发】按钮,可下发插件到已部署节点服务器上。注意:成功部署集群后才能下发插件,下发后应重启集群节点,才会生效。


# 3.1.8.4 编辑插件
进入集群管理,选择需要编辑插件的集群,点击【更多】按钮->【插件】按钮,在插件页面,选择需编辑的插件,点击【编辑】按钮,可编辑插件信息。


描述:插件描述信息
# 3.1.9 部署节点
进入【集群管理】,在集群管理页面,选择需要部署节点的集群信息,点击【节点】按钮,进入部署节点页面;选择需要部署的节点,点击【部署】按钮,可部署节点。


备注:若是集群部署,则集群的每个节点都需要点击【部署】按钮,部署节点前,需先新建集群信息。
# 3.1.10 重新部署节点
当节点部署时,可能由于服务器网络等原因,部署状态显示异常,需要重新进行部署;进入【集群管理】,在集群管理页面,选择需要重新部署节点的集群信息,点击【节点】按钮,进入部署节点页面;选择部署状态异常的节点,点击【重新部署】按钮,可重新部署节点。

备注:重新部署节点,需先部署状态为异常的节点才可重新部署。
# 3.1.11 启动节点
进入【集群管理】,在集群管理页面,选择需要启动节点的集群信息,点击【节点】按钮,进入部署节点页面,点击【更多】按钮—>【启动】按钮,可启动节点。


备注:未部署的节点不可启动,已启动的节点也不可再进行启动,若是单机,则只需要启动一个单机程序,若是集群,可选中左上角”集群名称“左边的方框,实现一键启动多个节点的效果。
# 3.1.12 停止节点
进入【集群管理】,在集群管理页面,选择需要启动节点的集群信息,点击【节点】按钮,进入部署节点页面,点击【更多】按钮—>【停止】按钮,可停止节点。


备注:只有已启动的节点可以停止,未启动的节点和未部署的节点不可进行停止。若是集群,可选中左上角”集群名称“左边的方框,实现一键停止多个节点的效果。
# 3.1.13 重启节点
进入【集群管理】,在集群管理页面,选择需要启动节点的集群信息,点击【节点】按钮,进入部署节点页面,点击【更多】按钮—>【重启】按钮,可停止节点。


备注:只有已启动的节点可以停止,未启动的节点和未部署的节点不可进行停止。
# 3.1.14 删除节点
进入【集群管理】,在集群管理页面,选择需要删除节点的集群信息,点击【节点】按钮,进入部署节点页面,点击【更多】按钮->【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除节点。
备注:只有未启动的节点才可以删除,单机节点不可以进行删除,直接删除集群即可。
# 3.1.15 编辑节点
进入【集群管理】,在集群管理页面,选择需要编辑节点的集群信息,点击【节点】按钮,进入部署节点页面,点击【更多】按钮->【编辑】按钮,在弹出的编辑节点弹框中可对节点信息进行编辑。

只可对描述信息进行重新编辑。
# 3.1.16 配置节点参数
节点配置信息是为了更方便的管理节点配置文件,可新增配置参数、编辑配置参数、删除配置参数、更新配置参数、启用配置参数、停用配置参数、下载配置文件、查询配置参数。
进入节点“配置信息”页面步骤如下:
.ca529cad.png)
.4e66ba0b.png)
.71972788.png)
# 3.1.16.1 新增配置信息
在节点“配置信息”页面,点击【新增】按钮,可新增配置信息。
.c9c4278a.png)
配置文件:配置项的配置文件名称,根据选择的节点进行显示,不可修改
配置名称:配置项的名称,自定义,唯一标识
配置内容:配置项的值,自定义
描述:配置项的描述信息,自定义
备注:新增配置信息需要更新配置文件才会更新到节点物理文件中,且新增的配置信息,需要重启节点,参数才会生效。
# 3.1.16.2 修改配置信息
在配置信息页面,选择需要修改的配置项,点击【编辑】按钮,可修改配置信息。
.603cedba.png)
配置文件:配置项的配置文件名称,根据选择的节点进行显示,不可修改
配置名称:配置项的名称,自定义,唯一标识
配置内容:配置项的值,自定义
描述:配置项的描述信息,自定义
备注:修改配置信息需要更新配置文件才会更新到节点物理文件中,且修改的配置信息,需要重启节点,参数才会生效
# 3.1.16.3 删除配置信息
在配置信息页面,选择需要删除的配置项,点击【删除】按钮,在弹出的确认删除弹框中,点击【确定】按钮,可删除配置信息。
.0ad07113.png)
备注:删除配置信息需要更新配置文件才会更新到节点物理文件中,且删除的配置信息,需要重启节点,参数才会生效;删除配置项需谨慎,避免因删除了重要的配置项而出现集群服务起不来或服务不可用。
# 3.1.16.4 启用配置参数
在配置信息页面,选择需要启用的配置项,点击【启动】按钮或勾选配置项后点击上方的【启用】按钮,可启用配置信息。
.18e025c9.png)
备注:启用配置信息需要更新配置文件才会更新到节点物理文件中,且启用的配置信息,需要重启节点,参数才会生效。
# 3.1.16.5 停用配置信息
在配置信息页面,选择需要停用的配置项,点击【停用】按钮或勾选配置项后点击上方的【停用】按钮,可停用配置信息。
.b67bd835.png)
备注:停用配置信息需要更新配置文件才会更新到节点物理文件中,且停用的配置信息,需要重启节点,参数才会生效;停用配置项需谨慎,避免因停用了重要的配置项而出现集群服务起不来或服务不可用。
# 3.1.16.6 下载配置文件
在配置信息页面,点击【下载】按钮,可下载选择节点的配置文件。
.a962d11a.png)
# 3.1.16.7 查询配置项
在配置信息页面,模糊或精确输入配置名称,即可查询到所需要的配置信息。
.4cc2c803.png)
备注:查询的配置文件是在集群管理中选择的节点对应的配置文件,不可修改配置文件。
# 3.1.17 一键操作
对于有多个节点的集群,可同时对多个节点进行操作,包括:一键部署、一键启动、一键停止。

# 3.1.17.1 一键部署
进入【集群管理】,在集群管理页面,选择需要部署节点的集群信息,点击【节点】按钮,进入部署节点页面,选中需要操作的节点前的白色方框,点击左上角的【部署】按钮,即可一键对多个节点进行部署。


备注:若想一次性选择集群的所有节点,可直接勾选集群名称前的白色方框

# 3.1.17.2 一键启动
进入【集群管理】,在集群管理页面,选择需要启动节点的集群信息,点击【节点】按钮,进入部署节点页面,选中需要操作的节点前的白色方框,点击左上角的【启动】按钮,即可一键启动多个节点。


备注:若想一次性选择集群的所有节点,可直接勾选集群名称前的白色方框。
# 3.1.17.3 一键停止
进入【集群管理】,在集群管理页面,选择需要停止节点的集群信息,点击【节点】按钮,进入部署节点页面,选中需要操作的节点前的白色方框,点击左上角的【停止】按钮,即可一键停止多个节点。


备注:若想一次性选择集群的所有节点,可直接勾选集群名称前的白色方框。
# 3.1.18 节点资源监控
备注:若想使用节点资源监控,必须使用AUC登录界面跳转到ADMQ管理控制台后再创建集群。
节点监控需结合AUC和AMP使用,且只能通过AUC登录跳转到ADMQ界面,具体过程如下:
在admq-manager的config文件夹下的application.properties配置文件中添加配置项:(如果使用AUC登录,则需要把配置项security.same-site.enabled设置成false)
amp.url=http://172.24.4.170:9002
auc.url=http://172.24.4.170:9000
重新启动admq-manager:
bin/admq-manager stop
bin/admq-manager start
访问AUC界面:http://172.24.4.170:9000

选中AUC首页右上角“admin”,会弹出弹窗,选中弹窗中的【平台管理】按钮,跳转到平台管理页面



进入【服务管理】,在服务管理页面点击【注册服务】按钮,进行注册服务(该步骤是可选的,如果已注册分布式消息系统,则直接修改注册服务的URL地址即可):


服务ID:产品服务ID的数值,自定义
服务名称:产品服务名称,自定义
服务简称:产品服务的简称代码
服务类目:选择中间件
URL地址:服务的URL访问的地址
是否上架:上架(若选择不上架,服务注册后需上架才能使用)
描述:服务的详细信息介绍
选中产品,会出现弹窗,在弹窗中点击已注册的服务,会跳转到ADMQ管控台首页。



进入【集群管理】,在集群管理页面,新增ADMQ集群和RocketMQ集群并启动,详情见3.1集群管理。选择需要监控节点的集群信息,点击【节点】按钮,进入部署节点页面,点击【监控】按钮,可弹出节点监控页面

备注:在节点监控页面中点击 右上角“查看更多指标详情”,可跳转到amp监控页面,该部分需要结合AUC和AMP进行使用,具体详见第10章运维监控。
# 第4章 用户
# 4.1 用户管理
用户管理用于新增用户信息、编辑用户信息、删除用户信息、修改用户密码、查看令牌、查看权限。

# 4.1.1 新增用户信息
进入【用户管理】,在用户管理页面点击左上角【新增】按钮,可新增用户信息。

用户名称:自定义,唯一标识
用户密码:用户的登录密码
确认密码:用户的确认密码
用户邮箱:用户的邮箱
所属集群:下拉框,可选择已有的集群
手机号码:用户的手机号码
描述:用户的描述信息
备注:新增用户信息,需先建集群信息,且只有超级管理可以新增用户信息,普通用户不可新增用户信息。
# 4.1.2 编辑用户信息
进入【用户管理】,进入用户管理页面,选择需要编辑的用户信息,点击【编辑】按钮,可修改用户信息。

用户名称:置灰显示,不可修改
外部用户:置灰显示,不可修改
用户邮箱:用户的邮箱,可修改
所属集群:置灰显示,不可修改
手机号码:用户的手机号码,可修改
描述:用户的描述信息,可修改
# 4.1.3 删除用户信息
进入【用户管理】,进入用户管理页面,选择需要删除的用户信息,点击【更多】按钮-【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除用户信息。


备注:只有超级管理员用户可以删除用户信息,普通用户不可删除用户信息。
# 4.1.4 修改用户密码
进入【用户管理】,进入用户管理页面,选择需要修改密码的用户信息,点击【更多】按钮-【修改密码】按钮,可修改用户密码信息。


用户名称:用户的名称,置灰显示,不可修改
当前密码:用户当前密码
新密码:用户的新密码
确认密码:再次输入用户的新密码
备注:当前密码输入10次错误,默认将会30分钟内禁止当前用户登录
# 4.1.5 查看用户令牌信息
进入【用户管理】,进入用户管理页面,选择需要查看令牌的用户信息,点击【令牌】按钮,可查看用户令牌信息。

# 4.1.6 查看用户权限信息
进入【用户管理】,进入用户管理页面,选择需要查看权限的用户,点击【权限】按钮,可查看用户权限信息。


备注:用户管理下的权限只可查看,不可修改,若想修改权限,需在资源管理下的命名空间或者主题下进行权限修改。
# 第5章 资源管理
资源管理用于管理监控租户、命名空间、主题,为消息中间件提供基础资源建立。
# 5.1 租户
租户用于新增租户信息、编辑租户信息、删除租户信息、查询租户信息、设置副本。注意:新增、编辑、删除租户之前,需要保证集群服务正常,否则操作将会失败。

# 5.1.1 新增租户信息
进入【资源管理】->【租户】,在租户页面左上角点击【新增】按钮,可新增租户信息。注意:点击新增前,需在右上角下拉选择需要新增租户的集群,点击新增按钮时,将会自动带入选择的集群名称。

集群:租户所属的集群,用户新增前,自行选择
租户名称:租户的名称,自定义,唯一性标识(不同集群下可重复)
描述:租户描述信息
# 5.1.2 编辑租户信息
进入【资源管理】->【租户】,在租户页面选择需要修改的租户,点击【编辑】按钮,可编辑租户信息。注意:在修改租户信息前,默认只显示一个集群的租户信息,可在右上角下拉选择集群。

集群:租户所属的集群,不可修改
租户名称:租户名称,置灰显示,不可修改
描述:租户描述信息
# 5.1.3 删除租户信息
进入【资源管理】->【租户】,在租户页面选择需要删除的租户,点击【删除】按钮,可删除租户信息。注意:当租户下存在命名空间时,不可删除租户信息,要删除租户,需要先删除命名空间。

# 5.1.4 查询租户
进入【资源管理】->【租户】,在租户页面选择集群信息,精确或者模糊输入租户名称,可查询租户信息。

# 5.1.5 副本设置
进入【资源管理】->【租户】,在租户页面选择要设置副本的租户,点击【副本设置】,可设置副本。

可用存储节点数:可用的存储节点数,自定义,不能大于租户所属集群的可用存储节点数
保存副本数:消息保存副本数,自定义,不能大于可用存储节点个数,且不小于等待确认副本数
等待确认副本数:至少等到多少个副本存储成功,自定义,不可大于可用存储节点个数和保存副本数
# 5.2 命名空间
命名空间属于租户下,用于新增命名空间信息、编辑命名空间信息、删除命名空间信息、查询命名空间信息、命名空间授权。注意:新增、编辑、删除命名空间之前,需要保证集群服务正常,否则操作将会失败。

# 5.2.1 新增命名空间信息
进入【资源管理】->【命名空间】,在命名空间页面左上角点击【新增】按钮,可新增命名空间信息。注意:点击新增前,需在右上角下拉选择需要新增命名空间的集群、租户,点击新增按钮时,将会自动带入选择的集群名称和租户名称。

在弹窗内点击【高级配置】按钮,会出现高级配置选项:


集群:租户所属的集群,用户新增前,自行选择,不可修改
租户:命名空间所属的租户,用户新增前,自行选择,不可修改
命名空间:自定义,唯一性标识
消息TTL:未确认的消息多长时间后被标记为已确认,单位:天、小时、分钟,默认30天
最大保留时间:已确认的消息多长时间后被删除,单位:天、小时、分钟,默认30天
单个主题最大存储空间:已确认的消息多少大小后被删除,单位:MB、GB、TB,默认1GB
最大主题数:命名空间下最大主题数,超过此大小不可新增主题,默认为100
描述:命名空间描述信息
高级配置:
单个订阅最大消费者数:命名空间下的主题,1个订阅号只能有多少个消费者接收消息,默认不限制
单个主题最大消费者数:命名空间下的主题,1个主题只能有多少个消费者接收消息,默认不限制
单个主题最大生产者数:命名空间下的主题,1个主题只能有多少个生产者发送消息,默认不限制
bundle数:bundle数,不填,默认保存为4个
# 5.2.2 编辑命名空间信息
进入【资源管理】->【命名空间】,在命名空间页面选择需要修改的命名空间,点击【编辑】按钮,可编辑命名空间信息。注意:在修改租户信息前,默认只显示一个集群一个租户下的命名空间信息,可在右上角下拉选择集群、下拉选择租户。

集群:不可修改,租户所属的集群
租户:不可修改,命名空间所属的租户
命名空间名称:新增命名空间时的名称,不可修改,置灰显示
消息TTL:未确认的消息多长时间后被标记为已确认,单位:天、小时、分钟,默认30天,可修改
最大保留时间:已确认的消息多长时间后被删除,单位:天、小时、分钟,默认30天,可修改
单个主题最大存储空间:已确认的消息多少大小后被删除,单位:MB、GB、TB,默认1GB,可修改
最大主题数:命名空间下最大主题数,超过此大小不可新增主题,默认为100,可修改
描述:命名空间描述信息,可修改
高级配置:
单个订阅最大消费者数:命名空间下的主题,1个订阅号只能有多少个消费者接收消息,默认不限制,可修改
单个主题最大消费者数:命名空间下的主题,1个主题只能有多少个消费者接收消息,默认不限制,可修改
单个主题最大生产者数:命名空间下的主题,1个主题只能有多少个生产者发送消息,默认不限制,可修改
bundle数:bundle数,不填时默认保存为4个,不可修改
# 5.2.3 删除命名空间信息
进入【资源管理】->【命名空间】,在命名空间页面选择需要删除的命名空间,点击【更多】按钮->【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除命名空间信息。注意:当删除命名空间信息时,命名空间下的主题资源将会被删除。


# 5.2.4 查询命名空间
进入【资源管理】->【命名空间】,在命名空间页面选择集群、选择租户,精确或者模糊输入命名空间名,点击【查询】按钮,可查询命名空间信息。
# 5.2.5 新增命名空间权限信息
进入【资源管理】->【命名空间】,在命名空间页面选择需要分配权限的命名空间,点击【分配权限】按钮,进入分配权限页面;点击【新增】按钮,下拉选择用户,勾选权限前的复选框,可新增命名空间下用户的权限信息。注意:授权主要是给普通用户提供授权,默认超级管理员拥有所有命名空间的消息发布权限和消息消费权限。
用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 5.2.6 编辑命名空间权限信息
进入【资源管理】->【命名空间】,在命名空间页面选择需要分配权限的命名空间,点击【分配权限】按钮,进入分配权限页面;选择需要更新授权的用户,点击【编辑】按钮,可更新用户在命名空间下的权限。注意:授权主要是给普通用户提供授权,默认超级管理员拥有所有命名空间的消息发布权限和消息消费权限。
用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限
# 5.2.7 删除命名空间权限信息
进入【资源管理】->【命名空间】,在命名空间页面选择需要分配权限的命名空间,点击【分配权限】按钮,进入分配权限页面;选择需要删除授权的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在命名空间下的权限。注意:授权主要是给普通用户提供授权,默认超级管理员拥有所有命名空间的消息发布权限和消息消费权限。
备注:删除用户后,使用用户的token发送接收消息时,会提示没有权限。
# 5.2.8 副本设置
进入【资源管理】->【命名空间】,在命名空间页面选择要设置副本的命名空间,点击【更多】按钮->【副本设置】按钮,可设置副本。注意:设置副本前,需确保集群服务正常。


可用存储节点数:可用的存储节点数,自定义,不能大于租户所属集群的可用存储节点数
保存副本数:消息保存副本数,自定义,不能大于可用存储节点个数,且不小于等待确认副本数
等待确认副本数:至少等到多少个副本存储成功,自定义,不可大于可用存储节点个数和保存副本数
# 5.2.9 开启消息轨迹
主题的消息轨迹默认不会开启,若消息查询需查询消息轨迹,则需要在主题所属命名空间开启消息轨迹。进入【资源管理】->【命名空间】,在命名空间页面选择要开启轨迹的命名空间,点击【更多】按钮->【开启消息轨迹】按钮,可开启消息轨迹,开启成功后会弹出 “开启消息轨迹成功!” 提示。 注意:开启消息轨迹后再创建的主题,才能生效。


# 5.2.10 关闭消息轨迹
开启的消息轨迹,若需要关闭消息轨迹,则需要在主题所属命名空间关闭消息轨迹。进入【资源管理】->【命名空间】,在命名空间页面选择要关闭轨迹的命名空间,点击【更多】按钮->【关闭消息轨迹】按钮,可关闭消息轨迹。注意:关闭消息轨迹后再创建的主题,才能生效。

# 5.3 主题
主题属于租户下,用于新增主题信息、编辑主题信息、删除主题信息、查询主题信息。注意:新增、编辑、删除主题时,需要保证集群服务正常,否则操作将会失败。

# 5.3.1 新增主题信息
进入【资源管理】->【主题】,在主题页面左上角点击【新增】按钮,可新增主题信息。注意:点击新增前,需在右上角下拉选择需要新增命名空间的集群、租户、命名空间,点击新增按钮时,将会自动带入选择的集群名称和租户名称、命名空间名称。

租户:主题所属租户名称,不可修改,
命名空间:主题所属命名空间名称,不可修改
主题名称:自定义,唯一性标识,不同租户下的主题可重复
类型:主题的类型,下拉选择,持久化和非持久化,默认持久化
分区数:主题的分区数量,默认为0
描述:主题描述信息
# 5.3.2 编辑主题信息
进入【资源管理】->【主题】,在主题页面选择需要修改的主题名称,点击【编辑】按钮,可编辑主题信息。注意:在修改主题信息前,默认只显示一个集群一个租户下一个命名空间下的的主题信息,可在右上角下拉选择集群、下拉选择租户、下拉选择命名空间。

租户:主题所属租户名称,不可修改
命名空间:主题所属命名空间名称,不可修改
主题名称:主题的名称,不可修改,
类型:主题的类型,置灰显示,不可修改,
分区数:主题的分区数量,置灰显示,不可修改,
描述:主题描述信息,可修改
# 5.3.3 删除主题信息
进入【资源管理】->【主题】,在主题页面选择需要删除的主题,点击【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除命名主题信息。

# 5.3.4 查询主题
进入【资源管理】->【主题】,在命名空间页面右上角选择集群、选择租户、选择命名空间、精确或者模糊输入主题名称,点击【查询】按钮,可查询主题。

# 5.3.5 查看主题详情
进入【资源管理】->【主题】,在命名空间页面右上角选择集群、选择租户、选择命名空间,点击具体主题名称,可进行查看主题具体信息。
.4ad80d41.png)
.bdc9b4f7.png)
# 5.3.6 新增主题权限信息
进入【资源管理】->【主题】,在主题页面选择需要分配权限的主题,点击【分配权限】按钮,进入分配权限页面;点击左上角【新增】按钮,可新增主题下用户的权限信息。注意:新增权限前,需满足集群服务可用;授权主要是给普通用户提供授权,默认超级管理员拥有所有主题的消息发布权限和消息消费权限;若主题所属的命名空间也有权限,则按最小粒度,主题设置的权限有效;若用户在命名空间无权限则需要先对命名空间进行授权。

用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 5.3.7 编辑主题权限信息
进入【资源管理】->【主题】,在主题页面选择需要分配权限的主题,点击【分配权限】按钮,进入分配权限页面;选择需要更新权限的用户,点击【编辑】按钮,可更新用户在主题下的权限。注意:更新权限前,需满足集群服务可用;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效。

用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限,可修改
# 5.3.8 删除主题权限信息
进入【资源管理】->【主题】,在主题页面选择需要分配权限的主题,点击【分配权限】按钮,进入分配权限页面;选择需要删除权限的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在主题下的权限。注意:删除权限前,需满足集群服务可用;若删除权限的用户的命名空间下有权限,则命名空间权限生效。

备注:删除用户后,使用用户的token向该主题发送接收消息时,会提示没有权限。
# 第6章 通道
# 6.1 通道管理
通道管理作用于跨集群间的消息复制功能,生产者发送到某一集群后,消费者能通过其他集群消费消息。通道管理用于新增通道信息、编辑通道信息、删除通道信息。注意:新增、编辑、删除通道之前,需要保证源集群和目的集群服务正常,否则操作将会失败。
# 6.1.1 新增通道信息
进入【通道管理】,在通道管理页面点击左上角【新增】按钮,可新增通道信息。注意:新增通道信息前,需要2个集群且2个集群中拥有相同名称的租户、命名空间,需集群服务器应正常。
通道名称:自定义,唯一标识
源集群:被复制消息的集群,下拉选择
租户:被复制消息的租户,下拉选择
命名空间:被复制消息的命名空间,下拉选择
目的集群:需复制消息的集群,下拉选择
描述:通道描述信息
# 6.1.2 编辑通道信息
进入【通道管理】,进入通道管理页面,选择需要编辑的通道信息,点击【编辑】按钮,可修改通道信息。
通道名称:自定义,唯一标识,可修改
源集群:被复制消息的集群,不可修改,置灰显示
租户:被复制消息的租户,不可修改,置灰显示
命名空间:被复制消息的命名空间,不可修改,置灰显示
目的集群:需复制消息的集群,不可修改,置灰显示
描述:通道描述信息,可修改
# 6.1.3 删除通道信息
进入【通道管理】,进入通道管理页面,选择需要删除的通道信息,点击【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,删除通道信息。注意:通道信息被删除后,源集群就不能复制消息到目的集群了
# 第7章 消息查询
# 7.1 消息查询
消息作用于监控查询消息的轨迹,当生产者发送消息时,消费者接收时,可在消息查询中查询到消息轨迹。消息查询用于新增消息查询信息,查询消息信息、查看消息轨迹,其中查询方式可按主题查询、按消息ID查询、按消息属性查询。
注意:
消息查询包含两部分: 1. 生产者发送消息到ADMQ 2. ADMQ存储消息、消费者接收消息、消费者接收消息后返回确认。
生产者发送消息到ADMQ,命名空间不需要开启消息轨迹就可以查到。
ADMQ存储消息、消费者接收消息、消费者接收消息后返回确认,需要对主题所在的命名空间开启消息轨迹(参考章节 5.2.9),且需要对集群节点新增配置参数:messageTraceEnabled=true并重启集群服务(参考章节 3.1.16),之后才能查到;除此之外,只有当命名空间开启消息轨迹后创建的主题才会记录消息轨迹。
# 7.1.1 新增消息查询信息
进入【消息查询】,在页面顶端选择需要查询消息所在的集群、租户、命名空间,主题,然后在消息查询页面点击左上角【新增】按钮,可新增消息查询信息。
时间范围:默认为近1小时的时间范围
集群:需查询消息的集群,下拉选择
租户:需查询消息的租户,下拉选择
命名空间:需查询消息的命名空间,下拉选择
主题:需查询消息的主题,下拉选择(当主题较多时,也可模糊输入主题名,可自动带出符合条件的主题)
查询方式:单选,包括:主题查询、消息ID查询、消息属性查询。 选择主题查询时,直接点击【确定】按钮即可; 选择消息ID查询时,可输入消息ID,选填项,用来过滤消息ID,不输入则默认查询主题下所有的消息ID; 选择消息属性查询时,需输入属性值,必填项,用来过滤消息属性,格式为name=value,多个属性之间用逗号隔开。
# 7.1.2 查看消息内容
进入【消息查询】,在消息查询页面选择需要查看消息轨迹的消息查询,点击【查看】按钮:
进入查询结果页面,选择需要查看轨迹的消息ID,点击【消息内容】按钮,可查询对应消息ID的消息内容如:基本内容、消息内容、其他属性值。
# 7.1.3 查看消息轨迹
进入【消息查询】,在消息查询页面选择需要查看消息轨迹的消息查询,点击【查看】按钮:
进入查询结果页面,选择需要查看轨迹的消息ID,点击【消息轨迹】按钮,可查询对应消息ID的消息轨迹信息,轨迹信息包含消息生产、消息存储、消息消费的信息。
注意,查询消息轨迹后,在查询结果页面,消息的轨迹状态会从“未生成”变成“已生成”,如下:

# 7.1.4 查询已创建消息查询的信息
进入【消息查询】,在消息查询页面右上角选择集群、租户、命名空间、主题名称后,可通过查询过滤(也可不过滤,默认全部),点击【查询】按钮,可查询消息查询信息。
# 第8章 系统运维
系统运维用于监控管理系统,如节点服务器信息、消息信息等。有运维监控、系统告警、功能验证、节点日志。
# 8.1 运维监控
进入【系统运维】->【运维监控】,可查询到运维监控信息。运维监控用于监控管理集群信息,可以监控近1小时内消息的统计信息、近24小时的消息统计信息、近3天的消息统计信息、近7天的消息统计信息、也可自定义查询, 还能根据租户、命名空间、主题、订阅消息,查询消息的生产速率、消费速率、生产流量、消费流量、生产者数量、主题数量、接收消息总数量、当前存储使用量等,并以图表的形式直观显示。
# 8.2 系统告警
进入【系统运维】->【系统监控】,可监控到集群以及集群各节点的状态,并以告警事件的形式通知客户;可以统计近1小时内的告警数量、近24小时的告警数量、近3天的告警数量、近7天的告警数量、也可自定义查询近1个月的告警数量;也可根据告警级别、告警主体、告警状态、告警类别进行查询告警事件。
可点击【查看】按钮查看具体的告警信息:


# 8.3 功能验证
功能验证作用于验证admq功能是否可正常使用,应用场景主要用于产品升级后。功能验证用于新增功能验证、编辑功能验证、执行功能验证、查看历史执行功能验证信息、删除功能验证。
# 8.3.1 新增功能验证信息
进入【系统运维】->【功能验证】,在功能验证页面点击左上角【新增】按钮,可新增功能验证信息。
功能:功能名称,自定义
测试引擎:上传的测试引擎文件
测试参数:功能的参数,自定义,不可随便写入,写入错误,可能执行功能验证会失败
示例参数:功能的示例参数,自定义
功能描述:功能的描述信息
# 8.3.2 编辑功能验证信息
进入【系统运维】->【功能验证】,在功能验证页面选择需要修改的功能,点击【编辑】按钮,可编辑功能验证信息。
功能:功能名称,自定义,可修改
测试引擎:上传的测试引擎文件,可重新上传引擎文件覆盖原来上传的
测试参数:功能的参数,自定义,不可随便写入,写入错误,可能执行功能验证会失败,可修改
示例参数:功能的示例参数,自定义,可修改
功能描述:功能的描述信息,自定义,可修改
# 8.3.3 执行功能验证信息
进入【系统运维】->【功能验证】,在功能页面选择需要执行的功能验证信息,点击【更多】按钮->【执行】按钮,在弹出的选择集群弹框中,下拉选择集群执行功能验证信息,可执行功能验证信息。
# 8.3.4 删除功能验证信息
进入【系统运维】->【功能验证】,在功能页面选择需要删除的功能验证信息,点击【更多】按钮->【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除节点。

# 8.3.5 查看历史执行功能验证信息
进入【系统运维】->【功能验证】,在功能页面选择需要查看历史执行的功能验证信息,点击【更多】按钮->【查看历史】按钮,弹出查看历史弹框,在弹框中可查看到历史功能测试用例、测试结果、测试耗时、测试时间等信息。
# 8.4 节点日志
管理控制台支持对集群的计算节点、存储节点、协调器的日志进行管理,可下载日志、查看实时日志、删除日志、查询日志信息等。

# 8.4.1 下载节点日志信息
进入【系统运维】>【节点日志】,在节点日志页面,选择需要下载日志的节点,点击【下载】按钮,可下载节点日志信息。

# 8.4.2 查看节点日志信息
进入【系统运维】>【节点日志】,在节点日志页面,选择需要查看实时日志的节点,点击【查看】按钮,可查看节点实时日志信息。注意:当日志信息是压缩文件的形式,不可直接查看,可下载日志到本地解压后进行查看。


# 8.4.3 删除节点日志信息
进入【系统运维】>【节点日志】,在节点日志页面,选择需要删除日志的节点,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除日志信息。注意:删除日志会同步删除节点服务器的日志文件,需谨慎删除。

# 8.4.4 查询节点日志信息
进入【系统运维】>【节点日志】,在节点日志页面,选择集群名称、选择IP、选择节点类型、输入文件名称,可查询节点日志信息。

# 第9章 插件管理
插件管理用于管理rocketmq插件、kafka插件、rabbitmq插件的资源,如消息实例、命名空间、主题、订阅组、消息查询、死信消息、虚拟主机、交换机、队列、路由等。
# 9.1 RocketMQ管理
# 9.1.1 租户
租户是管理存储配额、隔离策略的基本单元,使用租户的目的是为了在多用户环境下,使用同一套程序,且保证用户间数据隔离。租户用于新增租户、编辑租户、删除租户、查询租户。

# 9.1.1.1 新增租户
进入【插件管理】->【RocketMQ管理】,选择 ”租户“ 页签,点击左上角【新增】按钮,可新增租户。注意:新增租户前,需满足集群服务可用。

租户名称:租户的名称,自定义,唯一标识
命名空间上限:租户下可新增的命名空间数量,默认为1000,可更改
主题上限:租户下可新增的主题数量,默认为1000,可更改
订阅组上限:租户下可新增的订阅组数量,默认为1000,可更改
最大生产速率(条/秒):生产者的最大发送速率,默认为10000,可自定义
最大消费速率(条/秒):消费者的最大消费速率,默认为10000,可自定义
描述:租户描述信息
注意:租户名称只是为了方便记,真正的租户是租户ID。
# 9.1.1.2 编辑租户
进入【插件管理】->【RocketMQ管理】,选择 ”租户“ 页签,选择租户列表中某一个租户名称后的【编辑】按钮,可编辑租户信息。注意:编辑租户前,需满足集群服务可用。

租户ID:租户的ID,不可修改,唯一标识
租户名称:租户的名称,可修改
命名空间上限:租户下可新增的命名空间数量,可修改
主题上限:租户下可新增的主题数量,可修改
订阅组上限:租户下可新增的订阅组数量,可修改
最大生产速率(条/秒):生产者的最大发送速率,可修改
最大消费速率(条/秒):消费者的最大消费速率,可修改
描述:租户描述信息,可修改
注意:租户名称只是为了方便记,真正的租户是租户ID。
# 9.1.1.3 删除租户
进入【插件管理】->【RocketMQ管理】,选择 ”租户“ 页签,选择租户列表中某一个租户名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除租户信息。注意:删除租户前,需满足集群服务可用,且租户下无命名空间,否则不可删除。

# 9.1.1.4 查询租户
进入【插件管理】->【RocketMQ管理】,选择 ”租户“ 页签,右上角选择集群名称,精确或模糊输入租户名称、可查询租户信息。

# 9.1.1.4 查看租户详情
进入【插件管理】->【RocketMQ管理】,选择"租户"页签,点击某个租户的租户ID,可查看租户详情。


# 9.1.2 命名空间
命名空间作用于管理命名空间资源、命名空间用户权限。命名空间用于新增命名空间、编辑命名空间、删除命名空间、查询命名空间、新增命名空间权限、编辑命名空间权限、删除命名空间权限。

# 9.1.2.1 新增命名空间信息
进入【插件管理】->【RocketMQ管理】,选择 ”命名空间“ 页签,点击左上角【新增】按钮,可新增命名空间信息。注意:新增命名空间前,需满足集群服务可用。

租户名称:命名空间所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:命名空间所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间名称:命名空间的名称,自定义,唯一标识,可自定义
消息保留时间:消息的最大保留时间,超过则不保存消息,单位(分钟)、(小时)、(天),默认30天(最大值)
描述:命名空间的描述信息,可自定义
# 9.1.2.2 编辑命名空间信息
进入【插件管理】->【RocketMQ管理】,选择 ”命名空间“ 页签,选择要编辑的命名空间名称后的【编辑】按钮,可编辑命名空间信息。注意:编辑命名空间前,需满足集群服务可用。

租户名称:命名空间所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:命名空间所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间名称:命名空间的名称,置灰显示,不可修改
消息保留时间:消息的最大保留时间,超过则不保存消息,单位(分钟)、(小时)、(天),可修改
描述:命名空间的描述信息,可修改
# 9.1.2.3 删除命名空间信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要删除的命名空间名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除命名空间信息。注意:删除命名空间前,需满足集群服务可用,且命名空间下无订阅组,删除命名空间后,命名空间下的主题信息也被删除。

# 9.1.2.4 查询命名空间
进入【插件管理】->【RocketMQ管理】,选择 ”命名空间“ 页签,右上角选择集群名称、租户名称、精确或模糊输入命名空间名称,可查询命名空间信息。(注意:包含的也有)

# 9.1.2.5 新增命名空间权限信息
进入【插件管理】->【RocketMQ管理】,选择 ”命名空间“ 页签,选择要新增授权的命名空间名称后的【分配权限】按钮后,会进入到分配权限页面。

在分配权限页面,点击左上角的【新增】按钮,可新增命名空间下用户的权限信息。注意:新增权限前,需满足集群服务可用;授权主要是给普通用户提供授权,默认超级管理员拥有所有命名空间的消息发布权限和消息消费权限。

用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 9.1.2.6 编辑命名空间权限信息
进入【插件管理】->【RocketMQ管理】,选择 ”命名空间“ 页签,选择要更新授权的命名空间名称后的【分配权限】按钮,跳转到分配权限页面。

在分配权限页面,选择需要更新授权的用户,点击【编辑】按钮,可更新用户在命名空间下的权限。注意:更新权限前,需满足集群服务可用。

用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限,可修改
# 9.1.2.7 删除命名空间权限信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要删除授权的命名空间名称后的【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要删除授权的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在命名空间下的权限。注意:删除权限前,需满足集群服务可用。

# 9.1.3 主题
主题作用于管理主题资源、主题的用户权限。主题用于新增主题、编辑主题、删除主题、查询主题、新增主题权限、编辑主题权限、删除主题权限、发送消息。

# 9.1.3.1 新增主题信息
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,点击左上角【新增】按钮,可新增主题信息。注意:新增主题前,需满足集群服务可用。1

租户名称:主题所属的租户的名称,根据选择的租户自动带出,不可修改
租户ID:主题所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:主题所属命名空间名称,根据选择的命名空间自动带出,不可修改
主题名称:主题的名称,自定义,唯一标识
类型:主题类型,单选项,包括:普通、全局顺序、局部顺序
队列数:主题下的队列数量,默认且最小是3,可自定义
broker:主题所在集群的broker,若是插件加载的,则是None
描述:主题的描述信息
# 9.1.3.2 编辑主题信息
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,选择要编辑的主题名称后的【更多】按钮->【编辑】按钮,可编辑主题信息。注意:编辑主题前,需满足集群服务可用。(加图片)


租户名称:主题所属租户名称,根据选择的租户自动带出,不可修改
租户ID:主题所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:主题所属命名空间名称,根据选择的命名空间自动带出,不可修改
主题名称:主题的名称,置灰显示,不可修改
类型:主题类型,置灰显示,不可修改
队列数:主题下的队列数量,可修改
broker:主题所在集群的broker,不可修改
描述:主题的描述信息
# 9.1.3.3 删除主题信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要删除的主题名称后的【更多】按钮->【删除】按钮,在弹出的弹窗,先选择主题所在的broker,然后点击【确定】按钮,可删除主题信息。注意:删除主题前,需满足集群服务可用。


# 9.1.3.4 查询主题
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,右上角选择集群名称、租户名称、命名空间名称、输入主题名称,可查询主题信息。

# 9.1.3.5 查看主题详情
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,点击某个主题的名称,可查看主题详情。


# 9.1.3.6 新增主题权限信息
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,选择要新增授权的主题名称后的【分配权限】按钮,跳转到分配权限页面。

在分配权限页面,点击左上角的【新增】按钮,可新增主题下用户的权限信息。注意:新增权限前,需满足集群服务可用;授权主要是给普通用户提供授权,默认超级管理员拥有所有主题的消息发布权限和消息消费权限;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效;若用户在命名空间无权限则需要先对命名空间进行授权。

用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 9.1.3.7 编辑主题权限信息
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,选择要更新授权的主题名称后的【更多】按钮->【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要更新授权的用户,点击【编辑】按钮,可更新用户在主题下的权限。注意:更新权限前,需满足集群服务可用;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效。

用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限
# 9.1.3.8 删除主题权限信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要删除授权的主题名称后的【更多】按钮->【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要删除授权的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在主题下的权限。注意:删除权限前,需满足集群服务可用;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效,若删除权限的用户的命名空间下有权限,则命名空间权限生效。

# 9.1.3.9 发送消息
进入【插件管理】->【RocketMQ管理】,选择"主题"页签,选择要发送消息的主题名称后的【发送消息】按钮,可发送消息,发送的消息,可在消息查询中查到。

主题:发送消息的主题名称,根据选择的主题名称自动带出,不可修改
消息Tag:消息的Tag,多个Tag用英文逗号隔开,可自定义
消息Key:消息的key,多个Key用英文逗号隔开,可自定义
消息内容:发送的消息内容,自定义
开启消息轨迹:开启、不开启,单选项,默认不开启
在新建界面点击【确定】按钮后,会出现成功发送消息的弹窗,如下:

# 9.1.4 订阅组
发送消息和接收消息,都需要绑定订阅组,订阅组就是作用于管理订阅组。订阅组用于新增订阅组、编辑订阅组、删除订阅组、查询订阅组、重置消费位点(回溯消息)

# 9.1.4.1 新增订阅组
进入【插件管理】->【RocketMQ管理】,选择订阅组页签,点击左上角【新增】按钮,可新增订阅组信息。注意:新增订阅组前,需满足集群服务可用。

租户名称:订阅组所属租户的名称,根据选择的租户自动带出,不可修改
租户ID:订阅组所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:订阅组所属命名空间名称,根据选择的命名空间自动带出,不可修改
订阅组名称:订阅组的名称,自定义,唯一标识
广播消费:消费模式是否为广播模式,开启、不开启,默认开启
broker:所属broker,默认None
描述:订阅组的描述信息
# 9.1.4.2 编辑订阅组信息
进入【插件管理】->【RocketMQ管理】,选择订阅组页签,选择要编辑的订阅组名称后的【编辑】按钮,可编辑订阅组信息。注意:编辑订阅组前,需满足集群服务可用。

租户名称:订阅组所属租户的名称,不可修改
租户ID:订阅组所属租户的租户ID,不可修改
命名空间:订阅组所属命名空间名称,不可修改
订阅组名称:订阅组的名称,置灰显示,不可修改
广播消费:默认开启,可修改
broker:所属broker,置灰显示,不可修改
描述:订阅组的描述信息,可修改
# 9.1.4.3 删除订阅组信息
进入【插件管理】->【RocketMQ管理】,选择订阅组页签,选择【更多】->【删除】按钮,在弹出的弹窗内选择broker后,点击【确定】按钮,可删除订阅组信息。注意:删除订阅组前,需满足集群服务可用。


# 9.1.4.4 查询订阅组
进入【插件管理】->【RocketMQ管理】,选择订阅组页签,右上角选择集群名称、实例名称、命名空间名称,精确或模糊输入订阅组名称,可查询订阅组信息。

# 9.1.4.5 查看订阅组详情
进入【插件管理】->【RocketMQ管理】,选择订阅组页签,选择需要查看详细信息的订阅组,点击订阅组名称,可查看订阅组详细信息,如基本信息,订阅关系、生产者客户端、消费者客户端、订阅组生产消费统计、主题消费统计。


# 9.1.4.6 重置消费位点
进入【插件管理】->【RocketMQ管理】,选择订阅组页签,选择需要重置消费的订阅组,点击【更多】->【重置消费位点】按钮,可重置消费。


主题:重置消费的主题名,下拉选择(主题需发送和接收消息后才能被选择)
重置方式:重置消费的方式,单选按钮,从最新开始位点开始消费、从指定时间点开始消费
# 9.1.5 消息查询
消息查询用于查询消息具体信息,如消息生产信息、消息消费信息等。消息查询用于新增消息查询、查询已创建的消息查询、查看查询消息详情信息。

# 9.1.5.1 新增消息查询
进入【插件管理】->【RocketMQ管理】,选择消息查询页签,点击左上角【新增】按钮,可新增消息查询信息。注意:新增消息查询前,需满足集群服务可用。
.0b042e62.png)
时间范围:要查询的消息时间范围,默认为1小时,可手动选择
租户名称:租户的名称,根据选择的租户自动带出,不可修改
租户ID:租户的ID,根据选择的租户自动带出,不可修改
命名空间:命名空间名称,根据选择的命名空间自动带出,不可修改
主题:主题名称,根据选择的命名空间下拉选择
查询方式:消息查询的查询方式(根据查询方式进行消息查询过滤),主题查询、消息ID查询、消息Key查询
# 9.1.5.2 查看消息详情信息
进入【插件管理】->【RocketMQ管理】,选择消息查询页签,选择新增的消息查询,点击【查看】按钮,可查看查询结果,选择某条消息ID,点击【查看】操作,可查看具体某条消息的详细信息(查询条件信息、消息生产信息、消息消费信息)。
.debfc41e.png)
# 9.1.5.3 查询已创建的消息查询
进入【插件管理】->【RocketMQ管理】,选择消息查询页签,选择集群名称、租户名称、命名空间、输入主题名称,可查询到满足条件的消息查询。
.a37fe5e8.png)
# 9.2 Kafka管理
# 9.2.1 租户
租户是管理存储配额、隔离策略的基本单元,使用租户的目的是为了在多用户环境下,使用同一套程序,且保证用户间数据隔离。租户用于新增租户、编辑租户、删除租户、查询租户。

# 9.2.1.1 新增租户
进入【插件管理】->【Kafka管理】,选择 ”租户“ 页签,点击左上角【新增】按钮,可新增租户。注意:新增租户前,需满足集群服务可用。

租户名称:租户的名称,自定义,唯一标识
主题上限:租户下可新增的主题数量,默认为1000,可更改
订阅组上限:租户下可新增的订阅组数量,默认为1000,可更改
描述:租户描述信息
注意:租户名称只是为了方便记,真正的租户是租户ID。
# 9.2.1.2 编辑租户
进入【插件管理】->【Kafka管理】,选择 ”租户“ 页签,选择租户列表中某一个租户名称后的【编辑】按钮,可编辑租户信息。注意:编辑租户前,需满足集群服务可用。

租户名称:租户的名称,可修改
主题上限:租户下可新增的主题数量
订阅组上限:租户下可新增的订阅组数量
描述:租户描述信息
# 9.2.1.3 删除租户
进入【插件管理】->【Kafka管理】,选择 ”租户“ 页签,选择租户列表中某一个租户名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除租户信息。注意:删除租户前,需满足集群服务可用,且租户下无命名空间,否则不可删除。

# 9.2.1.4 查询租户
进入【插件管理】->【Kafka管理】,选择 ”租户“ 页签,右上角选择集群名称、输入租户名称、可查询租户信息。

# 9.2.1.5 查看租户详情
进入【插件管理】->【Kafka管理】,选择”租户“ 页签,择需要查看详细信息的租户,点击租户ID,可查看租户详细信息。


# 9.2.2 命名空间
命名空间作用于管理命名空间资源、命名空间用户权限。命名空间用于新增命名空间、编辑命名空间、删除命名空间、查询命名空间、新增命名空间权限、编辑命名空间权限、删除命名空间权限。

# 9.2.2.1 新增命名空间信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,点击左上角【新增】按钮,可新增命名空间信息。注意:新增命名空间前,需满足集群服务可用。

租户名称:命名空间所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:命名空间所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间名称:命名空间的名称,自定义,唯一标识
消息保留时间:消息的最大保留时间,超过则不保存消息,单位(分钟)、(小时)、(天),默认30天(最大值)
描述:命名空间的描述信息
# 9.2.2.2 编辑命名空间信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要编辑的命名空间名称后的【编辑】按钮,可编辑命名空间信息。注意:编辑命名空间前,需满足集群服务可用。

租户名称:命名空间所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:命名空间所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间名称:命名空间的名称,置灰显示,不可修改
消息保留时间:消息的最大保留时间,超过则不保存消息,单位(分钟)、(小时)、(天),可修改
描述:命名空间的描述信息,可修改
# 9.2.2.3 删除命名空间信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要删除的命名空间名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除命名空间信息。注意:删除命名空间前,需满足集群服务可用,且命名空间下无订阅组,删除命名空间后,命名空间下的主题信息也被删除。

# 9.2.2.4 查询命名空间
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,右上角选择集群名称、租户名称、输入命名空间名称,可查询命名空间信息。(注意:包含的也有)

# 9.2.2.5 新增命名空间权限信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要新增授权的命名空间名称后的【分配权限】按钮后,会进入到分配权限页面。

在分配权限页面,点击左上角的【新增】按钮,可新增命名空间下用户的权限信息。注意:新增权限前,需满足集群服务可用;授权主要是给普通用户提供授权,默认超级管理员拥有所有命名空间的消息发布权限和消息消费权限。

用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 9.2.2.6 编辑命名空间权限信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要更新授权的命名空间名称后的【分配权限】按钮,跳转到分配权限页面。

在分配权限页面,选择需要更新授权的用户,点击【编辑】按钮,可更新用户在命名空间下的权限。注意:更新权限前,需满足集群服务可用。

用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限,可修改
# 9.2.2.7 删除命名空间权限信息
进入【插件管理】->【Kafka管理】,选择 ”命名空间“ 页签,选择要删除授权的命名空间名称后的【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要删除授权的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在命名空间下的权限。注意:删除权限前,需满足集群服务可用。

# 9.2.3 主题
主题作用于管理主题资源、主题的用户权限。主题用于新增主题、编辑主题、删除主题、查询主题、新增主题权限、编辑主题权限、删除主题权限、发送消息。

# 9.2.3.1 新增主题信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,点击左上角【新增】按钮,可新增主题信息。注意:新增主题前,需满足集群服务可用。1

租户名称:主题所属的租户的名称,根据选择的租户自动带出,不可修改
租户ID:主题所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:主题所属命名空间名称,根据选择的命名空间自动带出,不可修改
主题名称:主题的名称,自定义,唯一标识
分区数:主题的分区数量,默认分区数为3,可修改
描述:主题的描述信息
# 9.2.3.2 编辑主题信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要编辑的主题名称后的【更多】按钮->【编辑】按钮,可编辑主题信息。注意:编辑主题前,需满足集群服务可用。
.3dcdb0e1.png)
.6ea467c7.png)
租户名称:主题所属租户名称,根据选择的租户自动带出,不可修改
租户ID:主题所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:主题所属命名空间名称,根据选择的命名空间自动带出,不可修改
主题名称:主题的名称,置灰显示,不可修改
分区数:主题的分区数量,置灰显示,不可修改
描述:主题的描述信息
# 9.2.3.3 删除主题信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要删除的主题名称后的【更多】按钮->【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除主题信息。注意:删除主题前,需满足集群服务可用。
.def758c0.png)
.9c00402a.png)
# 9.2.3.4 查询主题
进入【插件管理】->【Kafka管理】,选择"主题"页签,右上角选择集群名称、租户名称、命名空间名称、输入主题名称,可查询主题信息。

# 9.2.3.5 查看主题详情
进入【插件管理】->【Kafka管理】,选择"主题"页签,点击某个主题的名称,可查看主题详情。


# 9.2.3.6 新增主题权限信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要新增授权的主题名称后的【分配权限】按钮,跳转到分配权限页面。

在分配权限页面,点击左上角的【新增】按钮,可新增主题下用户的权限信息。注意:新增权限前,需满足集群服务可用;授权主要是给普通用户提供授权,默认超级管理员拥有所有主题的消息发布权限和消息消费权限;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效;若用户在命名空间无权限则需要先对命名空间进行授权。

用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 9.2.3.7 编辑主题权限信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要更新授权的主题名称后的【更多】按钮->【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要更新授权的用户,点击【编辑】按钮,可更新用户在主题下的权限。注意:更新权限前,需满足集群服务可用;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效。

用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限
# 9.2.3.8 删除主题权限信息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要删除授权的主题名称后的【更多】按钮->【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要删除授权的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在主题下的权限。注意:删除权限前,需满足集群服务可用;若主题所属命名空间也有权限,则按最小粒度,主题设置的权限有效,若删除权限的用户的命名空间下有权限,则命名空间权限生效。

# 9.2.3.9 发送消息
进入【插件管理】->【Kafka管理】,选择"主题"页签,选择要发送消息的主题名称后的【发送消息】按钮,可发送消息,发送的消息,可在消息查询中查到。(这里功能有问题,消息发不出去)

主题:发送消息的主题名称,根据选择的主题名称自动带出,不可修改
分区编号:主题的分区编号,自定义,用来指定将消息发送到主题的某一个确定的分区
消息Key:消息key,自定义
消息内容:发送的消息内容,自定义
# 9.2.4 订阅组
发送消息和接收消息,都需要绑定订阅组,订阅组就是作用于管理订阅组。订阅组用于新增订阅组、编辑订阅组、删除订阅组、查询订阅组、重置消费位点(回溯消息)

# 9.2.4.1 新增订阅组
进入【插件管理】->【Kafka管理】,选择 ”订阅组“ 页签,点击左上角【新增】按钮,可新增订阅组信息。注意:新增订阅组前,需满足集群服务可用。

租户名称:订阅组所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:订阅组所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:订阅组所属命名空间名称,根据选择的命名空间自动带出,不可修改
订阅组名称:订阅组的名称,自定义,唯一标识
描述:订阅组的描述信息
# 9.2.4.2 编辑订阅组信息
进入【插件管理】->【Kafka管理】,选择 ”订阅组“ 页签,选择要编辑的订阅组名称后的【编辑】按钮,可编辑订阅组信息。注意:编辑订阅组前,需满足集群服务可用。

租户名称:订阅组所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:订阅组所属租户的租户ID,根据选择的租户自动带出,不可修改
命名空间:订阅组所属命名空间名称,根据选择的命名空间自动带出,不可修改
订阅组名称:订阅组的名称,置灰显示,不可修改
描述:订阅组的描述信息
# 9.2.4.3 删除订阅组信息
进入【插件管理】->【Kafka管理】,选择 ”订阅组“ 页签,选择要删除的订阅组名称后的【删除】按钮,可删除订阅组信息。注意:删除订阅组前,需满足集群服务可用。

# 9.2.4.4 查询订阅组
进入【插件管理】->【Kafka管理】,选择 ”订阅组“ 页签,右上角选择集群名称、实例名称、命名空间名称、输入订阅组名称,可查询订阅组信息。

# 9.2.4.5 查看订阅组详情
进入【插件管理】->【Kafka管理】,选择 ”订阅组“ 页签,选择需要查看详细信息的订阅组,点击订阅组名称,可查看订阅组详细信息,如基本信息,订阅关系、生产者客户端、消费者客户端、订阅组生产消费统计、主题消费统计。


# 9.2.4.5 重置消费位点
进入【插件管理】->【Kafka管理】,选择 ”订阅组“ 页签,选择需要重置消费的订阅组,点击【重置消费位点】按钮,可重置消费。


主题:重置消费的主题名,下拉选择(主题需发送和接收消息后才能被选择)
重置方式:重置消费的方式,单选按钮,从最新位点开始消费、从最旧位点开始消费、从指定时间点开始消费、从指定位置开始消费
# 9.3 RabbitMQ管理
# 9.3.1 租户
租户是管理存储配额、隔离策略的基本单元,使用租户的目的是为了在多用户环境下,使用同一套程序,且保证用户间数据隔离。租户用于新增租户、编辑租户、删除租户、查询租户。

# 9.3.1.1 新增租户
进入【插件管理】->【RabbitMQ管理】,选择 ”租户“ 页签,点击左上角【新增】按钮,可新增租户。注意:新增租户前,需满足集群服务可用;RabbitMQ的租户ID需要和standalone.conf配置文件中amqpTenant的值保持一致。

租户名称:租户的名称,自定义,唯一标识
虚拟主机上限:租户下可新增的虚拟主机上限,默认为1000,可更改
交换机上限:虚拟主机下可新增的交换机数量,默认为2000,可更改
队列上限:虚拟主机下可新增的队列数量,默认为2000,可更改
描述:租户描述信息
注意:租户名称只是为了方便记,真正的租户是租户ID。
# 9.3.1.2 编辑租户
进入【插件管理】->【RabbitMQ管理】,选择 ”租户“ 页签,选择租户列表中某一个租户名称后的【编辑】按钮,可编辑租户信息。注意:编辑租户前,需满足集群服务可用。

租户ID:租户的ID,根据租户名称自动生成的,不可修改。
租户名称:租户的名称,可修改
虚拟主机上限:租户下可新增的虚拟主机上限,可更改
交换机上限:虚拟主机下可新增的交换机数量,可更改
队列上限:虚拟主机下可新增的队列数量,可更改
描述:租户描述信息,可更改
# 9.3.1.3 删除租户
进入【插件管理】->【RabbitMQ管理】,选择 ”租户“ 页签,选择租户列表中某一个租户名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除租户信息。注意:删除租户前,需满足集群服务可用,且租户下无虚拟主机,否则不可删除。

# 9.3.1.4 查询租户
进入【插件管理】->【RabbitMQ管理】,选择 ”租户“ 页签,右上角选择集群名称、输入租户名称、可查询租户信息。

# 9.3.1.5 查看租户详情
进入【插件管理】->【RabbitMQ管理】,选择”租户“ 页签,选择需要查看详细信息的租户,点击租户ID,可查看租户详细信息。


# 9.3.2 虚拟主机
虚拟主机是共享相同的身份认证和加密环境的独立服务器域,拥有自己的交换机、队列,绑定和权限机制。虚拟主机用于新增虚拟主机、编辑虚拟主机、删除虚拟主机、查询虚拟主机、新增虚拟主机权限、编辑虚拟主机权限、删除虚拟主机权限。

# 9.3.2.1 新增虚拟主机信息
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,点击左上角【新增】按钮,可新增虚拟主机。注意:新增虚拟主机前,需满足集群服务可用。

租户名称:虚拟主机所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机名称:虚拟主机的名称,自定义,唯一标识
消息保留时间:消息的最大保留时间,超过则不保存消息,单位(分钟)、(小时)、(天),默认30天(最大值)
描述:虚拟主机的描述信息
# 9.3.2.2 编辑虚拟主机信息
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,选择要编辑的虚拟主机名称后的【编辑】按钮,可编辑虚拟主机信息。注意:编辑虚拟主机前,需满足集群服务可用。

租户名称:虚拟主机所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机名称:虚拟主机的名称,置灰显示,不可修改
消息保留时间:消息的最大保留时间,超过则不保存消息,单位(分钟)、(小时)、(天),可修改
描述:命名空间的描述信息,可修改
# 9.3.2.3 删除虚拟主机信息
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,选择要编辑的虚拟主机名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除虚拟主机信息。注意:删除虚拟主机前,需满足集群服务可用,且虚拟主机下无交换机、队列,删除虚拟主机后,虚拟主机下的交换机、队列信息也被删除。

# 9.3.2.4 查询虚拟主机
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,右上角选择集群名称、租户名称、输入虚拟主机名称,可查询虚拟主机信息。(注意:包含的也有)

# 9.3.2.5 新增虚拟主机权限信息
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,选择要新增授权的虚拟主机名称后的【分配权限】按钮后,会进入到分配权限页面。

在分配权限页面,点击左上角的【新增】按钮,可新增命名空间下用户的权限信息。注意:新增权限前,需满足集群服务可用;授权主要是给普通用户提供授权,默认超级管理员拥有所有命名空间的消息发布权限和消息消费权限。

用户:下拉选择,获取的用户管理(4.1)的用户信息
权限:复选框,消息发布权限和消息消费权限
# 9.3.2.6 编辑虚拟主机权限信息
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,选择要更新授权的虚拟主机名称后的【分配权限】按钮,跳转到分配权限页面。

在分配权限页面,选择需要更新授权的用户,点击【编辑】按钮,可更新用户在命名空间下的权限。注意:更新权限前,需满足集群服务可用。

用户:用户名称,不可修改
权限:复选框,消息发布权限和消息消费权限,可修改
# 9.3.2.7 删除虚拟主机权限信息
进入【插件管理】->【RabbitMQ管理】,选择 ”虚拟主机“ 页签,选择要删除授权的虚拟主机名称后的【分配权限】按钮,进入到分配权限页面。

在分配权限页面,选择需要删除授权的用户,点击【删除】按钮,在弹出的确认删除弹框点击【确定】按钮,可删除用户在虚拟主机下的权限。注意:删除权限前,需满足集群服务可用。

# 9.3.3 交换机
交换机作用于管理交换机资源,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。交换机用于新增交换机、编辑交换机、删除交换机、查询交换机、发送消息。

# 9.3.3.1 新增交换机信息
进入【插件管理】->【RabbitMQ管理】,选择"交换机"页签,点击左上角【新增】按钮,可新增交换机信息。注意:新增交换机前,需满足集群服务可用。1

租户名称:虚拟主机所属的租户的名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机:交换机所属虚拟主机名称,根据选择的虚拟主机自动带出,不可修改
交换机名称:交换机的名称,自定义,唯一标识
路由类型:单选框,交换机分发消息时的不同分发策略(匹配规则),目前共三种类型:direct、fanout、topic,可修改
描述:交换机的描述信息
# 9.3.3.2 编辑交换机信息
进入【插件管理】->【RabbitMQ管理】,选择"交换机"页签,选择要编辑的交换机名称后的【编辑】按钮,可编辑交换机信息。注意:编辑交换机前,需满足集群服务可用。

租户名称:虚拟主机所属租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机:交换机所属虚拟主机名称,根据选择的虚拟主机自动带出,不可修改
交换机名称:交换机的名称,置灰显示,不可修改
路由类型:新建交换机的时候选择的路由类型,置灰显示,不可修改
描述:交换机的描述信息
# 9.3.3.3 删除交换机信息
进入【插件管理】->【RabbitMQ管理】,选择"交换机"页签,选择要删除的交换机名称后的【删除】按钮,在弹出的确认删除弹框中点击【确定】按钮,可删除交换机信息。注意:删除交换机前,需满足集群服务可用。

# 9.3.3.4 查询交换机
进入【插件管理】->【RabbitMQ管理】,选择"交换机"页签,右上角选择集群名称、租户名称、虚拟主机名称、输入交换机名称,可查询交换机信息。

# 9.3.3.5 查看交换机详情
进入【插件管理】->【RabbitMQ管理】,选择”交换机“ 页签,选择需要查看详细信息的交换机,点击交换机,可查看交换机详细信息。


# 9.3.3.6 发送消息
进入【插件管理】->【RabbitMQ管理】,选择"交换机"页签,选择要发送消息的交换机名称后的【发送消息】按钮,可发送消息,发送的消息,可在消息查询中查到。

交换机:发送消息的交换机名称,根据选择的交换机名称自动带出,不可修改
消息Key:消息的key,自定义
消息属性:消息的属性,例如:key=value,多个属性用英文逗号隔开
消息属性:发送的消息内容,自定义
# 9.3.4 队列
队列用来存储生产者发送的消息,生产者并不会直接将消息发送到队列上,而是将消息发送到交换机,由交换机按照一定规则转发给指定队列。队列用于新增队列、编辑队列、删除队列、查询队列、查看队列详细信息、清空队列

# 9.3.4.1 新增队列信息
进入【插件管理】->【RabbitMQ管理】,选择 ”队列“ 页签,点击左上角【新增】按钮,可新增队列信息。注意:新增订阅组前,需满足集群服务可用。

租户名称:虚拟主机所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机:队列所属虚拟主机名称,根据选择的虚拟主机自动带出,不可修改
队列名称:队列的名称,自定义,唯一标识
描述:队列的描述信息
# 9.3.4.2 编辑队列信息
进入【插件管理】->【RabbitMQ管理】,选择 ”队列“ 页签,选择要编辑的队列名称后的【编辑】按钮,可编辑订阅组信息。注意:编辑订阅组前,需满足集群服务可用。

租户名称:虚拟主机所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机:队列所属虚拟主机名称,根据选择的虚拟主机自动带出,不可修改
队列名称:队列的名称,置灰显示,不可修改
描述:队列的描述信息
# 9.3.4.3 删除队列信息
进入【插件管理】->【RabbitMQ管理】,选择 ”队列“ 页签,选择要删除的队列名称后的【删除】按钮,可删除对列信息。注意:删除队列前,需满足集群服务可用。

# 9.3.4.4 查询队列
进入【插件管理】->【RabbitMQ管理】,选择 ”队列“ 页签,右上角选择集群名称、租户名称、虚拟主机名称、输入队列名称,可查询队列信息。

# 9.3.4.5 查看队列详情
进入【插件管理】->【RabbitMQ管理】,选择 ”队列“ 页签,选择需要查看详细信息的队列,点击队列名称,可查看队列详细信息,如基本信息,消费客户端、消息消费监控


# 9.3.4.6 清空队列信息
进入【插件管理】->【RabbitMQ管理】,选择 ”队列“ 页签,选择要清空数据的队列名称后的【清空】按钮,在弹出的确认清空弹框点击【确定】按钮,可清空存储在队列上的消息。注意:清空队列数据前,需满足集群服务可用。

# 9.3.5 路由
路由表示交换机和队列之间的绑定关系,每一个绑定关系会存在一个或者多个路由key,新增一个路由相当于在交换机中建立了一个路由关系表,生产者发送消息的时候,需要声明一个路由key,交换机拿到路由key之后,和路由中的路由key进行匹配,而匹配的规则是通过交换机类型来决定的。 路由用来新增路由信息、编辑路由信息、删除路由信息、查询路由信息、查看路由详细信息、重新绑定

# 9.3.5.1 新增路由信息
进入【插件管理】->【RabbitMQ管理】,选择 ”路由“ 页签,点击左上角【新增】按钮,可新增路由信息。注意:新增路由前,需满足集群服务可用。

租户名称:虚拟主机所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机:队列所属虚拟主机名称,根据选择的虚拟主机自动带出,不可修改
路由名称:路由的名称,自定义,唯一标识
源交换机:路由绑定的源交换机,下拉框,可选择
目标类型:路由绑定的目标类型,单选框,可选队列或者交换机
目标:路由绑定的目标,下拉框,可选择
路由key:路由的key,自定义,可一个或多个,多个之间以逗号隔开
描述:路由的描述信息
# 9.3.5.2 编辑路由信息
进入【插件管理】->【RabbitMQ管理】,选择 ”路由“ 页签,选择要编辑的路由名称后的【编辑】按钮,可编辑路由信息。注意:编辑路由前,需满足集群服务可用。

租户名称:虚拟主机所属的租户名称,根据选择的租户自动带出,不可修改
租户ID:虚拟主机所属租户的租户ID,根据选择的租户自动带出,不可修改
虚拟主机:队列所属虚拟主机名称,根据选择的虚拟主机自动带出,不可修改
路由名称:路由的名称,置灰显示,不可修改
源交换机:路由绑定的源交换机,置灰显示,不可修改
目标类型:路由绑定的目标类型,置灰显示,不可修改
目标:路由绑定的目标,置灰显示,不可修改
路由key:路由的key,置灰显示,不可修改
描述:路由的描述信息,可修改
# 9.3.5.3 删除路由信息
进入【插件管理】->【RabbitMQ管理】,选择 ”路由“ 页签,选择要删除的路由名称后的【删除】按钮,可删除路由信息。注意:删除路由前,需满足集群服务可用。

# 9.3.5.4 查询路由
进入【插件管理】->【RabbitMQ管理】,选择 ”路由“ 页签,右上角选择集群名称、租户名称、虚拟主机名称、交换机名称,输入路由名称或路由key,可查询路由信息。

# 9.3.5.5 查看路由详情
进入【插件管理】->【RabbitMQ管理】,选择 ”路由“ 页签,选择需要查看详细信息的路由,点击路由名称,可查看路由详细信息,如基本信息。


# 9.3.5.6 重新绑定
进入【插件管理】->【RabbitMQ管理】,选择 ”路由“ 页签,选择要重新绑定的路由名称后的【重新绑定】按钮后,页面弹出 "重新绑定路由成功!" 的提示,表示重新绑定成功。注意:重新绑定路由前,需满足集群服务可用。

# 第10章 监控运维
备注:若想使用AMP监控运维,必须使用AUC登录界面跳转到ADMQ管理控制台后再创建集群,创建集群具体步骤见第3章。
ADMQ集成AUC(金蝶Apusic统一管控平台)和AMP(金蝶天燕智能运维系统),通过AUC统一登录,使用AMP作为ADMQ的监控运维工具。
ADMQ的核心组件Broker、Bookkeeper和zookeeper都内置了Prometheus 接口,提供了Topic使用相关的指标,也暴露出了集群中各个组件的整体健康状况。这些相关指标可以直接使用 HTTP 方式获取,并支持接入到AMP中,进行指标的统计汇总,以及针对指标的监控和报警。
# 10.1 管控台集成AUC和AMP
# 10.1.1 AUC登录
在管控台config目录下application.properties配置文件中添加参数:
# AUC服务地址;如果使用AUC登录,则需要把配置项security.same-site.enabled设置成false
auc.url=http://172.24.4.170:9000
# AMP服务地址
amp.url=http://172.24.4.170:9002
2
3
4
重新启动admq-manager服务:
bin/admq-manager stop
bin/admq-manager start
2
浏览器输入URL:http://172.24.4.170:9000/,进入AUC登录界面:
.61eda164.png)
输入用户名、密码、验证码,进入AUC管理界面:
.29a45268.png)
# 10.1.2 注册ADMQ服务并跳转到ADMQ界面
选择【admin】->【平台管理】,会跳转到平台管理页面。
.1cc826aa.png)
.b8b070c9.png)
在平台管理页面,选择【服务管理】->类目选择【中间件】,点击”注册服务“按钮,弹出注册页面,填入配置项值,点击完成按钮,进行注册ADMQ服务。
.311580ab.png)
在平台管理页面,选择【服务管理】->类目选择【中间件】,右上角输入注册的服务名称,可查到已注册的具体服务:
.76c51cb5.png)
在平台管理页面,选择【产品】->【中间件】,找到注册的服务,放置到页面顶栏:
.4c322ddb.png)
在顶栏点击服务名称,可跳转到ADMQ管控台界面:
.f33e9efa.png)
.43b01249.png)
备注:若点击服务名称,跳转时出现如下界面:
.a507dca2.png)
需在浏览器上访问管控台URL:https://172.24.4.126:12306/,进入管控台登录界面(不用输入用户名、密码),然后重新点击服务名称,即可成功跳转到ADMQ管控台管理界面。
# 10.1.3 节点跳转到监控页面
通过AUC跳转到ADMQ管控台管理界面后,在集群列表选择需要监控的集群,点击节点按钮,进入到节点页面,选择需要监控的节点,【更多】里停止节点,然后启动节点:
.35d4ac10.png)
.92bc58ab.png)
然后对停止、启动后的节点,点击监控按钮,出现“节点监控”弹窗,点击“查看更多指标详情”按钮,即可跳转到AMP的节点监控界面,实时查看节点资源使用情况。
.4c48a723.png)
.dcdd05b5.png)
.4c88d913.png)
# 10.1.4 各个组件监控界面
zookeeper节点监控界面:
.4c88d913.png)
storage节点监控界面:
.699ab0a6.png)
broker节点监控界面:
.b32c21b0.png)
# 10.2 监控指标
# 10.2.1 zookeeper监控指标
zookeeper 通过 Prometheus 接口提供了多个统计指标,AMP内置的模版对这些指标进行了汇总计算:
| 指标表达式 | 指标描述 |
|---|---|
| zk_up | 设施状态(运行:1,停止:0) |
| zk_server_leader | 是否是leader(1是0否) |
| zk_ephemerals_count | zk的临时节点数目(个) |
| zk_znode_count | zk的znode数量(个) |
| zk_watch_count | zk的watch数目(个) |
| zk_packets_sent | zk 发送的数据包速率(个/s) |
| zk_packets_received | zk 接收的数据包速率(个/s) |
| zk_num_alive_connections | 当前连接数 (个) |
| zk_open_file_descriptor_count | 打开文件描述符数 (个) |
| zk_max_file_descriptor_count | 最大文件描述符数 (个) |
| zk_avg_latency | zk 处理平均延迟(ms) |
| zk_min_latency | zk 处理最小时延(ms) |
| zk_max_latency | zk 处理最大时延(ms) |
| zk_outstanding_requests | 排队请求数(个) |
| zk_approximate_data_size | zk存储数据量(bytes) |
# 10.2.2 storage监控指标
storage通过 Prometheus 接口提供了多个统计指标,AMP内置的模版对这些指标进行了汇总计算:
| 指标表达式 | 指标描述 |
|---|---|
| count(bookie_SERVER_STATUS == 1) | 可写入Bookies数量 (个) |
| count(bookie_SERVER_STATUS == 0) | 只读的Bookies数量 (个) |
| sum(irate(bookie_READ_BYTES[30s])) | 读取字节数增长率 (MB/s) |
| sum(irate(bookie_WRITE_BYTES[30s])) | 写入字节数增长率 (MB/s) |
| avg(bookkeeper_server_ADD_ENTRY_REQUEST{success="true", quantile="0.99"}) | 添加成功平均请求数 (个) |
| avg(bookkeeper_server_READ_ENTRY_REQUEST{success="true", quantile="0.99"}) | 读取成功平均请求数 (个) |
| sum(irate(bookkeeper_server_ADD_ENTRY_count{success="true"}[30s])) | 添加entry成功增长率 (个/s) |
| sum(irate(bookkeeper_server_READ_ENTRY_count{success="true"}[30s])) | 读取entry成功增长率 (个/s) |
| sum(irate(bookie_journal_JOURNAL_SYNC_count{success="true"}[30s])) | 日志fsync操作次数的总增长率 (次/s) |
| sum(bookie_journal_JOURNAL_QUEUE_SIZE) | 日志队列中待处理的请求总数 (个) |
| sum(bookie_journal_JOURNAL_FORCE_WRITE_QUEUE_SIZE) | 强制写入队列中等待的请求总数 (个) |
| sum(bookie_journal_JOURNAL_CB_QUEUE_SIZE) | 回调队列中待处理的回调总数 (个) |
| sum(bookie_ledgers_count) | bookie中存储的ledger总数 (个) |
| sum(bookie_entries_count) | bookie中存储的条目总数 (个) |
| sum(bookie_read_cache_size) | bookie读取缓存大小 (byte) |
| sum(bookie_write_cache_size) | bookie写入缓存大小 (byte) |
| sum(bookie_DELETED_LEDGER_COUNT) | bookie启动以来被删除的ledger总数 (个) |
| sum(bookie_ledger_writable_dirs) | bookie中可写入目录的数量 (个) |
# 10.2.3 broker监控指标
Broker通过 Prometheus 接口提供了多个统计指标,AMP内置的模版对这些指标进行了汇总计算:
| 指标表达式 | 指标描述 |
|---|---|
| amp_pulsar_server_status | 设施状态 |
| amp_pulsar_namespace_count | 命名空间数 (个) |
| amp_pulsar_topics_count | 主题数 (个) |
| amp_pulsar_io_inRate | 消息接收频率 (消息/秒) |
| amp_pulsar_io_outRate | 消息发出频率 (消息/秒) |
| amp_pulsar_throughput_inSize | 流入吞吐量 (字节/秒) |
| amp_pulsar_throughput_outSize | 流出吞吐量 (字节/秒) |
| amp_pulsar_storage_size | 所有消息占用的总存储空间大小 |
| amp_pulsar_entrySizeLeOverflow_Count | 大小超1M的条目数 |
| amp_pulsar_storage_writeRate | 存储写频率 (条目/秒) |
| amp_pulsar_storage_readRate | 存储读频率 (条目/秒) |
| amp_pulsar_cpu_usage | cpu利用率 (%) |
| amp_pulsar_jvm_memoryCommittedSize | jvm已提交内存大小 (MB) |
| amp_pulsar_jvm_memoryUsedSize | jvm已使用内存 (MB) |
| amp_pulsar_jvm_memoryMaxSize | jvm最大内存 (MB) |
| amp_pulsar_jvm_memoryUsage | jvm内存使用率 (%) |
# 10.3 AMP配置告警策略
在设施监控界面点击具体的设施名称或者“查看”进入具体的一个设施,切换到“告警策略”页签,可以对该设施进行添加管理告警策略,可以查看该设施的目前创建的告警策略,该位置维护该设施的报警策略,也可以在“告警”模块下的“告警策略”下创建管理告警策略。
【监控】->【设施监控】->【中间件】,进入到设施监控列表:
.fc2a1749.png)
# 10.3.1 配置zookeeper告警策略
在设施监控列表点击集群某个zookeeper节点名称,进入到zookeeper的监控详情页面:
.53c6bce2.png)
.a6aacf96.png)
在监控详情页面,切换到”告警策略“页签:
.abfc7e1d.png)
点击“添加”按钮后创建zookeeper告警策略:
.7ad0235d.png)
.957b12aa.png)
AMP内置的Zookeeper告警模版已经包含了4条告警策略:
| 策略名称 | 监控项指标 | 告警触发条件 |
|---|---|---|
| 设施状态 | 设施状态 | 0s内,设施状态不等于0时,触发警告报警 |
| 当前连接数 | 当前连接数 | 0s内,当前连接数大于5000个时,触发警告报警 |
| zk处理平均延迟 | zk处理平均延迟 | 0s内,zk处理平均延迟大于5000ms时,触发警告报警 |
| 排队请求数 | 排队请求数 | 0s内,排队请求数大于1000个时,触发警告报警 |
用户也可以根据zookeeper的监控指标项添加自定义告警策略:
.efb9112c.png)
# 10.3.2 配置storage告警策略
在设施监控列表点击集群某个zookeeper节点名称,进入到zookeeper的监控详情页面:
.77adc70a.png)
.8744b030.png)
在监控详情页面,切换到”告警策略“页签:
.abfc7e1d.png)
点击“添加”按钮后创建storage告警策略:
.569ef13c.png)
.247bf7e8.png)
AMP内置的bookkeeper告警模版已经包含了3条告警策略:
| 策略名称 | 监控项指标 | 告警触发条件 |
|---|---|---|
| 可写入Bookies数量 | 可写入Bookies数量 | 0s内,可写入Bookies数量小于1个时,触发严重报警 |
| 只读的Bookie数量 | 只读的Bookie数量 | 0s内,只读的Bookies数量小于1个时,触发严重报警 |
| bookie中可写入目录的数量 | bookie中可写入目录的数量 | 0s内,bookie中可写入目录的数量小于1个时,触发严重报警 |
用户也可以根据bookkeeper的监控指标项添加自定义告警策略:
.407d3162.png)
# 10.3.3 配置broker告警策略
在设施监控列表点击集群某个zookeeper节点名称,进入到zookeeper的监控详情页面:
.d4a5943e.png)
.6a8030e0.png)
在监控详情页面,切换到”告警策略“页签:
.abfc7e1d.png)
点击“添加”按钮后创建broker告警策略:
.53d4cf03.png)
.c6ce8db2.png)
AMP内置的broker告警模版已经包含了8条告警策略:
| 策略名称 | 监控项指标 | 告警触发条件 |
|---|---|---|
| 设施状态策略 | 设施状态 | 2m内,设施状态不等于1时,触发灾难报警 |
| cpu使用率策略 | cpu利用率 | 2m内,cpu利用率大于80%时,触发警告报警 |
| jvm内存使用率 | jvm内存使用率 | 2m内,jvm内存使用率大于80%时,触发警告报警 |
| 主题数策略 | 主题数 | 2m内,主题数大于1000个时,触发警告报警 |
| 命名空间数策略 | 命名空间数 | 2m内,命名空间数大于20个时,触发警告报警 |
| 超1M的条目数策略 | 超1M的条目数 | 2m内,超1M的条目数大于10000个时,触发警告报警 |
| 流入吞吐量策略 | 流入吞吐量 | 2m内,流入吞吐量大于300字节/秒时,触发警告报警 |
| 流出吞吐量策略 | 流出吞吐量 | 2m内,流出吞吐量大于300字节/秒时,触发警告报警 |
用户也可以根据broker的监控指标项添加自定义告警策略:
.c38f581f.png)
# 10.4 AMP监控指标展示
# 10.4.1 按核心组件维度展示
进入设施监控界面,可以看到设施监控的列表信息,包括设施名称,设施类型,实例地址,监控模板,运行状态,采集器状态,可以进行信息编辑、监控配置、查看、删除的操作。
设施名称:创建基础设施输入的设施名称。
设施类型:创建基础设施选择的设施类型。
实例地址:监控对象的ip地址。
监控模板:设施配置任务时选择的监控模板。
运行状态:设施实例运行的状态,全部运行正常显示正常,存在运行异常的显示异常,没有配置监控,状态显示未知。
采集器状态:采集器运行的状态,全部运行正常显示正常,存在运行异常的显示异常,没有配置监控,状态显示提示未配置。
告警策略:设施创建的报警策略的数量。
信息编辑:进入修改基础设施界面,修改设施。
监控配置:进入修改监控配置界面,修改监控配置的设置。
删除:点击删除标签并确认,可以删除当前的基础设施和监控任务。
.f39c15e5.png)
在设施监控的列表界面可以看到ADMQ核心组件的监控项:Linux主机、zookeeper、bookkeeper和broker。点击具体的设施名称或者“查看”进入具体的一个设施,选择“设施详情”页签,可以查看设施信息,监控任务信息,监控项信息。
.6a8030e0.png)
切换到“监控图表”页签,可以查看该设施的基本信息和监控静态数据和动态图表数据,监控基本信息信息包括ip地址,设施类型,状态等信息。监控图表是对监控指标的可视化展示:
.74ab5f2f.png)
# 10.4.2 按业务视图维度展示
业务监控是为用户提供的可以对用户所使用的平台,平台下的所有应用的管理及监控功能,帮助用户全面了解自己应用平台下的应用关系,应用所关联的设施状态等信息,有助于用户快速了解业务及应用的监控状态,在发生告警信息后,能快速进行处理。
# 10.4.2.1 创建ADMQ监控业务
进入到【监控】下的【业务监控】,点击“创建业务”按钮,弹出“创建业务”窗口,输入业务的名称和业务内容,点击完成按钮,完成业务创建操作。
.cfe0b3f9.png)
# 10.4.2.1 创建ADMQ应用
在业务监控的概览页面,找到我们刚刚创建的ADMQ,点击“业务配置”进入业务配置页面。鼠标移动到业务列表下的ADMQ,点击出现的“+”,添加应用。
.49c44955.png)
.b375585a.png)
.d2c8cd8c.png)
# 10.4.2.1 为ADMQ应用绑定设施
选择刚刚创建的ADMQ应用,点击“绑定设施”按钮,选择ADMQ对应的Linux节点和zookeeper、bookkeeper和broker组件:
.14f47a0f.png)
.26ec2386.png)
# 10.4.2.1 查看ADMQ监控业务
完成ADMQ应用的设施绑定后,回到业务监控概览页面,点击ADMQ的“业务详情”,进入详情页面,在这里可以查看ADMQ集群的整体健康状况:
.b279e107.png)
.6537c573.png)
选择其中一个组件,可以查看该组件的“监控详情”:
.3adad202.png)
# 10.4.3 自定义仪表盘
AMP提供了自定义仪表盘的功能,可以将多个监控设施的多个指标放在一个仪表盘中,让用户方便查看,所有的关键指标一目了然,可以作为监控大屏使用。
在仪表盘概览页面,点击“创建仪表盘”按钮,输入仪表盘名称后完成创建。
.4329cbbb.png)
.72d8c523.png)
点击仪表盘的“查看详情”按钮,进入到仪表盘详情页面:
.621ed2f4.png)
点击“添加监控图表”,设置要添加的图表。
.53fa34a1.png)
.1690e8d4.png)
在图表模版中,可以选择ADMQ的组件zookeeper、Bookkeeper和Pulsar,然后选择关注的指标。例如对于zookeeper,我们关注连接数、处理平均延迟和排队请求数,可以分别添加这几个指标的图表。对于核心组件要在仪表盘展示的指标,一般都是告警策略关注的指标。完成添加后,这些关键指标将在一个仪表盘中统一展示。
.a0a49d8d.png)