插件说明
# 概述
ADMQ通过开启插件支持用户使用原生RocketMQ客户端、RabbitMQ客户端、Kafka客户端接收和发送消息,下面记录使用这些客户端时的功能支持情况。 插件操作请看用户手册
# RocketMQ客户端
RocketMQ是阿里开源的消息队列,详细介绍可参考:https://help.aliyun.com/product/29530.html
| 功能 | 是否支持 | 说明 |
|---|---|---|
| 普通消息 | 是 | |
| 异步消息 | 是 | |
| 顺序消息 | 是 | |
| 消息过滤 | 否 | |
| 死信队列 | 是 | |
| 消息重试 | 是 | |
| 广播消息 | 是 | |
| 批量消息 | 是 | |
| 事务消息 | 否 | |
| 定时消息 | 是 | |
| 延迟消息 | 是 | |
| ACL鉴权 | 是 |
# 普通消息
可以使用rocketmq客户端来创建消息的生产者、消费者,并进行普通消息的生产与消费。普通消息是指无特性的消息,区别于有特性的定时和延迟消息、顺序消息。
# 代码示例
# 异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。rocketmq异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
# 代码示例
# 顺序消息
顺序消息是RocketMQ提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 现在只支持分区顺序消息,对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
# 代码示例
# 死信队列
当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
# 代码示例
# 消息重试
RocketMQ消息收发过程中,若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
- 重试间隔:消息消费失败后再次被消息队列RocketMQ版投递给Consumer消费的间隔时间。
- 最大重试次数:消息消费失败后,可被消息队列RocketMQ版重复投递的最大次数。
# 代码示例
# 广播消息
当使用广播消费模式时,RocketMQ会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
# 代码示例
# 批量消息
如需提高消息的处理效率,或降低下游资源的API调用频率,可以使用批量发送消息。
# 代码示例
# 事务消息
消息队列RocketMQ版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
事务消息:消息队列RocketMQ版提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了消息队列RocketMQ版服务端,但是消息队列RocketMQ版服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
# 流程介绍

事务消息发送步骤如下:
- 生产者将半事务消息发送至消息队列RocketMQ版服务端。
- 消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
事务消息回查步骤如下:
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
# 生产消息规则
事务消息发送完成本地事务后,可在execute方法中返回以下三种状态:
TransactionStatus.CommitTransaction:提交事务,允许消费者消费该消息。TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后消息队列RocketMQ版服务端根据回查规则向生产者进行消息回查。通过
ONSFactory.createTransactionProducer创建事务消息的Producer时必须指定LocalTransactionChecker的实现类,处理异常情况下事务消息的回查。回查规则:本地事务执行完成后,若服务端收到的本地事务返回状态为TransactionStatus.Unknow,或生产者应用退出导致本地事务未提交任何状态。则服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。
回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。
以Java为例,以下设置表示:第一次回查的最快时间为60秒。
Message message = new Message(); message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"60");1
2
# 消费消息规则
事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列RocketMQ版服务端会根据Group ID去查询生产者客户端。
# 代码示例
# 定时消息
Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
# 代码示例
# 延迟消息
Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
# 代码示例
# RabbitMQ API
客户端API公开了AMQP 0-9-1协议模型中的关键实体,并提供了额外的抽象以方便使用。
RabbitMQ Java客户端使用com.rabbitmq.client作为顶层包。关键类和接口是:
Java客户端
Connection:表示AMQP 0-9-1连接。
Channel:表示一个AMQP 0-9-1通道,提供了大部分的操作(协议方法)。
Consumer:表示消息消费者。
Connection用于打开通道,注册连接生命周期事件处理程序,并关闭不再需要的连接。连接通过ConnectionFactory实例化,这是您配置各种连接设置的方式,例如vhost或用户名。协议操作通过通道接口进行。
JMS客户端
用于RabbitMQ的JMS客户端在RabbitMQ Java客户端之上实现了JMS规范,从而允许新的和现有的JMS应用程序连接到RabbitMQ协议处理器。
该库支持2.7.0版的JMS 1.1和2.0。插件和JMS客户机应该一起工作和使用。
Connection Factory:ConnectionFactory对象封装了一组由管理员定义的连接配置参数。客户机使用它创建与JMS提供程序的连接。
ConnectionFactory对象是一个JMS管理对象,支持并发使用。
Connection:Connection对象是客户机到其JMS提供程序的活动连接。它通常在Java虚拟机(JVM)之外分配提供者资源。
连接支持并发使用。
Session:Session对象是用于生产和消费消息的单线程上下文。尽管它可以在Java虚拟机(JVM)之外分配提供者资源,但它被认为是轻量级JMS对象。
Message:Message接口是所有JMS消息的根接口。它定义了用于所有消息的消息头和确认方法。
Message Producer:客户端使用MessageProducer对象将消息发送到目的地。通过将Destination对象传递给会话提供的消息生成器创建方法,可以创建MessageProducer对象。
MessageProducer是所有消息生产者的父接口。
Message Consumer:客户端使用MessageConsumer对象从目的地接收消息。通过将Destination对象传递给会话提供的消息使用者创建方法,可以创建MessageConsumer对象。
MessageConsumer是所有消息使用者的父接口。
Destination:Destination对象封装特定于提供程序的地址。
流Java客户端
RabbitMQ Stream Java Client是一个与RabbitMQ Stream Plugin通信的Java库。它允许创建和删除流,以及向这些流发布和从这些流消费。
管理 HTTP API
RabbitMQ管理插件提供了一个基于HTTP的API,用于管理和监控RabbitMQ节点和集群。
由于目前生产环境大多数使用的是RabbitMQ为3.10.0及以上版本,3.11.0及以上版本生产环境使用并不多,下面API支持情况以RabbitMQ Java Client最稳定的版本5.16.0进行说明。
# Java客户端
# Connection API
API的核心类是Connection,表示AMQP 0-9-1连接。
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
| void | abort() Abort this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. | 支持 |
| void | abort(int timeout) Abort this connection and all its channels with the [ AMQP.REPLY_SUCCESS close code and message 'OK'. | 支持 |
| void | abort(int closeCode, String closeMessage) Abort this connection and all its channels. | 支持 |
| void | abort(int closeCode, String closeMessage, int timeout) Abort this connection and all its channels. | 支持 |
| BlockedListener | addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) Add a lambda-based BlockedListener. | 支持 |
| void | addBlockedListener(BlockedListener listener) Add a BlockedListener. | 支持 |
| void | clearBlockedListeners() Remove all BlockedListeners. | 支持 |
| void | close() Close this connection and all its channels with the AMQP.REPLY_SUCCESSclose code and message 'OK'. | 支持 |
| void | close(int timeout) Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. | 支持 |
| void | close(int closeCode, String closeMessage) Close this connection and all its channels. | 支持 |
| void | close(int closeCode, String closeMessage, int timeout) Close this connection and all its channels. | 支持 |
| Channel | createChannel() Create a new channel, using an internally allocated channel number. | 支持 |
| Channel | createChannel(int channelNumber) Create a new channel, using the specified channel number if possible. | 支持 |
| InetAddress | getAddress() Retrieve the host. | 支持 |
| int | getChannelMax() Get the negotiated maximum channel number. | 支持 |
| Map<String,Object> | getClientProperties() Get a copy of the map of client properties sent to the server | 支持 |
| String | getClientProvidedName() Returns client-provided connection name, if any. | 支持 |
| ExceptionHandler | getExceptionHandler() Get the exception handler. | 支持 |
| int | getFrameMax() Get the negotiated maximum frame size. | 支持 |
| int | getHeartbeat() Get the negotiated heartbeat interval. | 支持 |
| String | getId() Returns a unique ID for this connection. | 支持 |
| int | getPort() Retrieve the port number. | 支持 |
| Map<String,Object> | getServerProperties() Retrieve the server properties. | 支持 |
| boolean | removeBlockedListener(BlockedListener listener) Remove a BlockedListener. | 支持 |
| void | setId(String id) Sets a unique ID for this connection. | 支持 |
# Channel API
核心API类是Channel,表示AMQP 0-9-1通道。
应该避免在线程之间共享Channel实例。应用程序应该每个线程使用一个通道,而不是跨多个线程共享同一个通道。
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
| void | abort() Abort this channel with the AMQP.REPLY_SUCCESS close code and message 'OK'. | 支持 |
| void | abort(int closeCode, String closeMessage) Abort this channel. | 支持 |
| ConfirmListener | addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) Add a lambda-based ConfirmListener. | 支持 |
| void | addConfirmListener(ConfirmListener listener) Add a ConfirmListener. | 支持 |
| ReturnListener | addReturnListener(ReturnCallback returnCallback) Add a lambda-based ReturnListener. | 支持 |
| void | addReturnListener(ReturnListener listener) Add a ReturnListener. | 支持 |
| void | basicAck(long deliveryTag, boolean multiple) Acknowledge one or several received messages. | 支持 |
| void | basicCancel(String consumerTag) Cancel a consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, Consumer callback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, Consumer callback) Start a consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) Start a consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) Start a non-nolocal, non-exclusive consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) Start a non-nolocal, non-exclusive consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer. | 支持 |
| String | basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, Consumer callback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments. | 支持 |
| String | basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments. | 支持 |
| String | basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments. | 支持 |
| String | basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments. | 支持 |
| String | basicConsume(String queue, Consumer callback) Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag. | 支持 |
| String | basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag. | 支持 |
| GetResponse | basicGet(String queue, boolean autoAck) Retrieve a message from a queue using AMQP.Basic.Get | 支持 |
| void | basicNack(long deliveryTag, boolean multiple, boolean requeue) Reject one or several received messages. | 支持 |
| void | basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) Publish a message. | 支持 |
| void | basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) Publish a message. | 支持 |
| void | basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) Publish a message. | 支持 |
| void | basicQos(int prefetchCount) Request a specific prefetchCount "quality of service" settings for this channel. | 支持 |
| void | basicQos(int prefetchCount, boolean global) Request a specific prefetchCount "quality of service" settings for this channel. | 支持 |
| void | basicQos(int prefetchSize, int prefetchCount, boolean global) Request specific "quality of service" settings. | 支持 |
| AMQP.Basic.RecoverOk | basicRecover() Ask the broker to resend unacknowledged messages. | 支持 |
| AMQP.Basic.RecoverOk | basicRecover(boolean requeue) Ask the broker to resend unacknowledged messages. | 支持 |
| void | basicReject(long deliveryTag, boolean requeue) Reject a message. | 支持 |
| void | clearConfirmListeners() Remove all ConfirmListeners. | 支持 |
| void | clearReturnListeners() Remove all ReturnListeners. | 支持 |
| void | close() Close this channel with the AMQP.REPLY_SUCCESS close code and message 'OK'. | 支持 |
| void | close(int closeCode, String closeMessage) Close this channel. | 支持 |
| AMQP.Confirm.SelectOk | confirmSelect() Enables publisher acknowledgements on this channel. | 支持 |
| long | consumerCount(String queue) Returns the number of consumers on a queue. | 支持 |
| AMQP.Exchange.BindOk | exchangeBind(String destination, String source, String routingKey) Bind an exchange to an exchange, with no extra arguments. | 支持 |
| AMQP.Exchange.BindOk | exchangeBind(String destination, String source, String routingKey, Map<String,Object> arguments) Bind an exchange to an exchange. | 支持 |
| void | exchangeBindNoWait(String destination, String source, String routingKey, Map<String,Object> arguments) Like exchangeBind(String, String, String, java.util.Map)) but sets nowait parameter to true and returns void (as there will be no response from the server). | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, BuiltinExchangeType type) Actively declare a non-autodelete, non-durable exchange with no extra arguments | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) Actively declare a non-autodelete exchange with no extra arguments | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments) Declare an exchange, via an interface that allows the complete set of arguments. | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> arguments) Declare an exchange. | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, String type) Actively declare a non-autodelete, non-durable exchange with no extra arguments | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, String type, boolean durable) Actively declare a non-autodelete exchange with no extra arguments | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments) Declare an exchange, via an interface that allows the complete set of arguments. | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String,Object> arguments) Declare an exchange. | 支持 |
| void | exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments) Like exchangeDeclare(String, String, boolean, boolean, java.util.Map)) but sets nowait parameter to true and returns nothing (as there will be no response from the server). | 支持 |
| void | exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments) Like exchangeDeclare(String, String, boolean, boolean, java.util.Map)) but sets nowait parameter to true and returns nothing (as there will be no response from the server). | 支持 |
| AMQP.Exchange.DeclareOk | exchangeDeclarePassive(String name) Declare an exchange passively; that is, check if the named exchange exists. | 支持 |
| AMQP.Exchange.DeleteOk | exchangeDelete(String exchange) Delete an exchange, without regard for whether it is in use or not | 支持 |
| AMQP.Exchange.DeleteOk | exchangeDelete(String exchange, boolean ifUnused) Delete an exchange | 支持 |
| void | exchangeDeleteNoWait(String exchange, boolean ifUnused) Like exchangeDelete(String, boolean) but sets nowait parameter to true and returns void (as there will be no response from the server). | 支持 |
| AMQP.Exchange.UnbindOk | exchangeUnbind(String destination, String source, String routingKey) Unbind an exchange from an exchange, with no extra arguments. | 支持 |
| AMQP.Exchange.UnbindOk | exchangeUnbind(String destination, String source, String routingKey, Map<String,Object> arguments) Unbind an exchange from an exchange. | 支持 |
| void | exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String,Object> arguments) Same as exchangeUnbind(String, String, String, java.util.Map)) but sets no-wait parameter to true and returns nothing (as there will be no response from the server). | 支持 |
| int | getChannelNumber() Retrieve this channel's channel number. | 支持 |
| Connection | getConnection() Retrieve the connection which carries this channel. | 支持 |
| Consumer | getDefaultConsumer() Get the current default consumer. | 支持 |
| long | getNextPublishSeqNo() When in confirm mode, returns the sequence number of the next message to be published. | 支持 |
| long | messageCount(String queue) Returns the number of messages in a queue ready to be delivered to consumers. | 支持 |
| AMQP.Queue.BindOk | queueBind(String queue, String exchange, String routingKey) Bind a queue to an exchange, with no extra arguments. | 支持 |
| AMQP.Queue.BindOk | queueBind(String queue, String exchange, String routingKey, Map<String,Object> arguments) Bind a queue to an exchange. | 支持 |
| void | queueBindNoWait(String queue, String exchange, String routingKey, Map<String,Object> arguments) Same as queueBind(String, String, String, java.util.Map)) but sets nowait parameter to true and returns void (as there will be no response from the server). | 支持 |
| AMQP.Queue.DeclareOk | queueDeclare() Actively declare a server-named exclusive, autodelete, non-durable queue. | 支持 |
| AMQP.Queue.DeclareOk | queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments) Declare a queue | 支持 |
| void | queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments) Like queueDeclare(String, boolean, boolean, boolean, java.util.Map)) but sets nowait flag to true and returns no result (as there will be no response from the server). | 支持 |
| AMQP.Queue.DeclareOk | queueDeclarePassive(String queue) Declare a queue passively; i.e., check if it exists. | 支持 |
| AMQP.Queue.DeleteOk | queueDelete(String queue) Delete a queue, without regard for whether it is in use or has messages on it | 支持 |
| AMQP.Queue.DeleteOk | queueDelete(String queue, boolean ifUnused, boolean ifEmpty) Delete a queue | 支持 |
| void | queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) Like queueDelete(String, boolean, boolean)) but sets nowait parameter to true and returns nothing (as there will be no response from the server). | 支持 |
| AMQP.Queue.PurgeOk | queuePurge(String queue) Purges the contents of the given queue. | 支持 |
| AMQP.Queue.UnbindOk | queueUnbind(String queue, String exchange, String routingKey) Unbinds a queue from an exchange, with no extra arguments. | 支持 |
| AMQP.Queue.UnbindOk | queueUnbind(String queue, String exchange, String routingKey, Map<String,Object> arguments) Unbind a queue from an exchange. | 支持 |
| boolean | removeConfirmListener(ConfirmListener listener) Remove a ConfirmListener. | 支持 |
| boolean | removeReturnListener(ReturnListener listener) Remove a ReturnListener. | 支持 |
| void | setDefaultConsumer(Consumer consumer) Set the current default consumer. | 支持 |
| boolean | waitForConfirms() Wait until all messages published since the last call have been either ack'd or nack'd by the broker. | 支持 |
| boolean | waitForConfirms(long timeout) Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses. | 支持 |
| void | waitForConfirmsOrDie() Wait until all messages published since the last call have been either ack'd or nack'd by the broker. | 支持 |
| void | waitForConfirmsOrDie(long timeout) Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses. | 支持 |
提示
由于不支持prefetchSize,调用basicQos(int prefetchSize, int prefetchCount, boolean global)prefetchSize不能传大于0的参数,否则会报"prefetchSize not supported "错误。
# Consumer API
应用程序回调对象通过订阅从队列接收通知和消息的接口。大多数实现都将子类化DefaultConsumer。
此接口的方法在调度线程中调用,该调度线程与连接的线程是分离的。这允许消费者调用通道或连接方法而不会导致死锁。
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
| void | handleCancel(String consumerTag) Called when the consumer is cancelled for reasons other than by a call to Channel.basicCancel(java.lang.String)). | 支持 |
| void | handleCancelOk(String consumerTag) Called when the consumer is cancelled by a call to Channel.basicCancel(java.lang.String)). | 支持 |
| void | handleConsumeOk(String consumerTag) Called when the consumer is registered by a call to any of the Channel.basicConsume(java.lang.String, com.rabbitmq.client.Consumer)) methods. | 支持 |
| void | handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) Called when a **basic.deliver** is received for this consumer. | 支持 |
| void | handleRecoverOk(String consumerTag) Called when a **basic.recover-ok** is received in reply to a **basic.recover**. | 支持 |
| void | handleShutdownSignal(String consumerTag, ShutdownSignalException sig) Called when either the channel or the underlying connection has been shut down. | 支持 |
# JMS客户端
不支持。
# 流Java客户端
不支持。
# 管理HTTP API
不支持。
# Kafka APIS
Kafka包括五个核心api:
Producer API允许应用程序向Kafka集群中的主题发送数据流。
Consumer API允许应用程序从Kafka集群中的主题读取数据流。
Streams API允许将数据流从输入主题转换为输出主题。
Connect API允许实现连接器,这些连接器可以不断地从一些源系统或应用程序拉入Kafka,或从Kafka推入一些接收器系统或应用程序。
Admin API允许管理和检查主题、代理和其他Kafka对象。
由于目前生产环境大多数使用的是Kafka为2.0.0及以上版本,3.0.0及以上版本生产环境使用并不多,下面APIS支持情况以Kafka 2.0.0最稳定的版本2.7.2进行说明。
# Producer API
一个Kafka客户端,它向Kafka集群发布记录。
生产者是线程安全的,跨线程共享一个生产者实例通常比拥有多个实例更快。
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
void | abortTransaction()Aborts the ongoing transaction. | 支持 |
void | beginTransaction()Should be called before the start of each new transaction. | 支持 |
void | close()Close this producer. | 支持 |
void | close(Duration timeout)This method waits up to timeout for the producer to complete the sending of all incomplete requests. | 支持 |
void | commitTransaction()Commits the ongoing transaction. | 支持 |
void | flush()Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. | 支持 |
void | initTransactions()Needs to be called before any other methods when the transactional.id is set in the configuration. | 支持 |
Map<MetricName,? extends Metric> | metrics()Get the full set of internal metrics maintained by the producer. | 支持 |
List<PartitionInfo> | partitionsFor(String topic)Get the partition metadata for the given topic. | 支持 |
Future<RecordMetadata> | send(ProducerRecord<K,V> record)Asynchronously send a record to a topic. | 支持 |
Future<RecordMetadata> | send(ProducerRecord<K,V> record, Callback callback)Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. | 支持 |
void | sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata)Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. | 支持 |
void | sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, String consumerGroupId)Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. | 支持 |
# Consumer API
消费Kafka集群记录的客户端。这个客户端透明地处理Kafka代理的失败,并透明地适应它获取的主题分区在集群内的迁移。此客户机还与代理交互,以允许消费者组使用消费者组来负载平衡消费。
消费者维护必要的代理的TCP连接以获取数据。使用后关闭消费者失败将会释放这些连接。消费者不是线程安全的。
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
void | assign(Collection<TopicPartition> partitions)Manually assign a list of partitions to this consumer. | 支持 |
Set<TopicPartition> | assignment()Get the set of partitions currently assigned to this consumer. | 支持 |
Map<TopicPartition,Long> | beginningOffsets(Collection<TopicPartition> partitions)Get the first offset for the given partitions. | 支持 |
Map<TopicPartition,Long> | beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)Get the first offset for the given partitions. | 支持 |
void | close()Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. | 支持 |
void | close(Duration timeout)Tries to close the consumer cleanly within the specified timeout. | 支持 |
void | close(long timeout, TimeUnit timeUnit)Deprecated. Since 2.0. Use close(Duration) (opens new window) or close() (opens new window). | 支持 |
void | commitAsync()Commit offsets returned on the last poll(Duration) (opens new window) for all the subscribed list of topics and partition. | 支持 |
void | commitAsync(Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)Commit the specified offsets for the specified list of topics and partitions to Kafka. | 支持 |
void | commitAsync(OffsetCommitCallback callback)Commit offsets returned on the last poll() (opens new window) for the subscribed list of topics and partitions. | 支持 |
void | commitSync()Commit offsets returned on the last poll() (opens new window) for all the subscribed list of topics and partitions. | 支持 |
void | commitSync(Duration timeout)Commit offsets returned on the last poll() (opens new window) for all the subscribed list of topics and partitions. | 支持 |
void | commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)Commit the specified offsets for the specified list of topics and partitions. | 支持 |
void | commitSync(Map<TopicPartition,OffsetAndMetadata> offsets, Duration timeout)Commit the specified offsets for the specified list of topics and partitions. | 支持 |
Map<TopicPartition,OffsetAndMetadata> | committed(Set<TopicPartition> partitions)Get the last committed offsets for the given partitions (whether the commit happened by this process or another). | 支持 |
Map<TopicPartition,OffsetAndMetadata> | committed(Set<TopicPartition> partitions, Duration timeout)Get the last committed offsets for the given partitions (whether the commit happened by this process or another). | 支持 |
OffsetAndMetadata | committed(TopicPartition partition)Deprecated. since 2.4 Use committed(Set) (opens new window) instead | 支持 |
OffsetAndMetadata | committed(TopicPartition partition, Duration timeout)Deprecated. since 2.4 Use committed(Set, Duration) (opens new window) instead | 支持 |
Map<TopicPartition,Long> | endOffsets(Collection<TopicPartition> partitions)Get the end offsets for the given partitions. | 支持 |
Map<TopicPartition,Long> | endOffsets(Collection<TopicPartition> partitions, Duration timeout)Get the end offsets for the given partitions. | 支持 |
void | enforceRebalance()Alert the consumer to trigger a new rebalance by rejoining the group. | 支持 |
ConsumerGroupMetadata | groupMetadata()Return the current group metadata associated with this consumer. | 支持 |
Map<String,List<PartitionInfo>> | listTopics()Get metadata about partitions for all topics that the user is authorized to view. | 支持 |
Map<String,List<PartitionInfo>> | listTopics(Duration timeout)Get metadata about partitions for all topics that the user is authorized to view. | 支持 |
Map<MetricName,? extends Metric> | metrics()Get the metrics kept by the consumer | 支持 |
Map<TopicPartition,OffsetAndTimestamp> | offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)Look up the offsets for the given partitions by timestamp. | 支持 |
Map<TopicPartition,OffsetAndTimestamp> | offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch, Duration timeout)Look up the offsets for the given partitions by timestamp. | 支持 |
List<PartitionInfo> | partitionsFor(String topic)Get metadata about the partitions for a given topic. | 支持 |
List<PartitionInfo> | partitionsFor(String topic, Duration timeout)Get metadata about the partitions for a given topic. | 支持 |
void | pause(Collection<TopicPartition> partitions)Suspend fetching from the requested partitions. | 支持 |
Set<TopicPartition> | paused()Get the set of partitions that were previously paused by a call to pause(Collection) (opens new window). | 支持 |
ConsumerRecords<K,V> | poll(Duration timeout)Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. | 支持 |
ConsumerRecords<K,V> | poll(long timeoutMs)Deprecated. Since 2.0. Use poll(Duration) (opens new window), which does not block beyond the timeout awaiting partition assignment. See KIP-266 (opens new window) for more information. | 支持 |
long | position(TopicPartition partition)Get the offset of the next record that will be fetched (if a record with that offset exists). | 支持 |
long | position(TopicPartition partition, Duration timeout)Get the offset of the next record that will be fetched (if a record with that offset exists). | 支持 |
void | resume(Collection<TopicPartition> partitions)Resume specified partitions which have been paused with pause(Collection) (opens new window). | 支持 |
void | seek(TopicPartition partition, long offset)Overrides the fetch offsets that the consumer will use on the next poll(timeout) (opens new window). | 支持 |
void | seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)Overrides the fetch offsets that the consumer will use on the next poll(timeout) (opens new window). | 支持 |
void | seekToBeginning(Collection<TopicPartition> partitions)Seek to the first offset for each of the given partitions. | 支持 |
void | seekToEnd(Collection<TopicPartition> partitions)Seek to the last offset for each of the given partitions. | 支持 |
void | subscribe(Collection<String> topics)Subscribe to the given list of topics to get dynamically assigned partitions. | 支持 |
void | subscribe(Collection<String> topics, ConsumerRebalanceListener listener)Subscribe to the given list of topics to get dynamically assigned partitions. | 支持 |
void | subscribe(Pattern pattern)Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | 支持 |
void | subscribe(Pattern pattern, ConsumerRebalanceListener listener)Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | 支持 |
Set<String> | subscription()Get the current subscription. | 支持 |
void | unsubscribe()Unsubscribe from topics currently subscribed with subscribe(Collection) (opens new window) or subscribe(Pattern) (opens new window). | 支持 |
void | wakeup()Wakeup the consumer. | 支持 |
# Streams API
一个Kafka客户端,允许对来自一个或多个输入主题的输入执行连续计算,并将输出发送到零个、一个或多个输出主题。计算逻辑可以通过使用拓扑来定义处理器的DAG拓扑,或者通过使用提供高级DSL来定义转换的StreamsBuilder来指定。
一个KafkaStreams实例可以包含一个或多个在配置中指定的线程处理工作。一个KafkaStreams实例可以作为一个单独的(可能是分布式的)流处理应用程序与具有相同应用程序ID的任何其他实例(无论是在同一进程中,在本机的其他进程上,还是在远程机器上)进行协调。这些实例将根据输入主题分区的分配来划分工作,以便使用所有分区。如果实例被添加或失败,所有(剩余的)实例将在它们之间重新平衡分区分配,以平衡处理负载并确保处理所有输入主题分区。
在内部,KafkaStreams实例包含一个普通的KafkaProducer和KafkaConsumer实例,用于读取输入和写入输出。
Streams API主要包括KSteam(KGroupedStream、TimeWindowedKStream 、SessionWindowedKStream )、KTable等API。
# KStream API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
<KR> KStream<KR,V> | selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)Set a new key (with possibly new type) for each input record. | 支持 |
<KR> KStream<KR,V> | selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)Set a new key (with possibly new type) for each input record. | 支持 |
<VR> KStream<K,VR> | mapValues(ValueMapper<? super V,? extends VR> mapper)Transform the value of each input record into a new value (with possible new type) of the output record. | 支持 |
<VR> KStream<K,VR> | mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)Transform the value of each input record into a new value (with possible new type) of the output record. | 支持 |
<VR> KStream<K,VR> | mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)Transform the value of each input record into a new value (with possible new type) of the output record. | 支持 |
<VR> KStream<K,VR> | mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)Transform the value of each input record into a new value (with possible new type) of the output record. | 支持 |
<VR> KStream<K,VR> | flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper)Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. | 支持 |
<VR> KStream<K,VR> | flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper, Named named)Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. | 支持 |
<VR> KStream<K,VR> | flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper)Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. | 支持 |
<VR> KStream<K,VR> | flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper, Named named)Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. | 支持 |
void | foreach(ForeachAction<? super K,? super V> action)Perform an action on each record of KStream. | 支持 |
void | foreach(ForeachAction<? super K,? super V> action, Named named)Perform an action on each record of KStream. | 支持 |
void | to(String topic)Materialize this stream to a topic using default serializers specified in the config and producer's DefaultPartitioner. | 支持 |
void | to(String topic, Produced<K,V> produced)Materialize this stream to a topic using the provided Produced (opens new window) instance. | 支持 |
void | to(TopicNameExtractor<K,V> topicExtractor)Dynamically materialize this stream to topics using default serializers specified in the config and producer's DefaultPartitioner. | 支持 |
void | to(TopicNameExtractor<K,V> topicExtractor, Produced<K,V> produced)Dynamically materialize this stream to topics using the provided Produced (opens new window) instance. | 支持 |
<KR> KGroupedStream<KR,V> | groupBy(KeyValueMapper<? super K,? super V,KR> keySelector)Group the records of this KStream on a new key that is selected using the provided KeyValueMapper (opens new window) and default serializers and deserializers. | 支持 |
<KR> KGroupedStream<KR,V> | groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Grouped<KR,V> grouped)Group the records of this KStream on a new key that is selected using the provided KeyValueMapper (opens new window) and Serde (opens new window)s as specified by Grouped (opens new window). | 支持 |
<KR> KGroupedStream<KR,V> | groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Serialized<KR,V> serialized)Deprecated. since 2.1. Use groupBy(KeyValueMapper, Grouped) (opens new window) instead | 支持 |
KGroupedStream<K,V> | groupByKey()Group the records by their current key into a KGroupedStream (opens new window) while preserving the original values and default serializers and deserializers. | 支持 |
KGroupedStream<K,V> | groupByKey(Grouped<K,V> grouped)Group the records by their current key into a KGroupedStream (opens new window) while preserving the original values and using the serializers as defined by Grouped (opens new window). | 支持 |
KGroupedStream<K,V> | groupByKey(Serialized<K,V> serialized)Deprecated. since 2.1. Use groupByKey(Grouped) (opens new window) instead | 支持 |
<GK,GV,RV> KStream<K,RV> | join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner)Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed inner equi join. | 支持 |
<GK,GV,RV> KStream<K,RV> | join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner, Named named)Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed inner equi join. | 支持 |
<GK,GV,RV> KStream<K,RV> | leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed left equi join. | 支持 |
<GK,GV,RV> KStream<K,RV> | leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner, Named named)Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed left equi join. | 支持 |
<K1,V1> KStream<K1,V1> | transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, Named named, String... stateStoreNames)Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily). | 支持 |
<K1,V1> KStream<K1,V1> | transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily). | 支持 |
<K1,V1> KStream<K1,V1> | flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, Named named, String... stateStoreNames)Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). | 支持 |
<K1,V1> KStream<K1,V1> | flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, String... stateStoreNames)Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). | 支持 |
提示
由于KStream API方法比较多,以上列出的方法只包括提供单元测试和自测的,没列出的方法大多数应该也支持,具体待实际应用中验证。
# KGroupedStream API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
<VR> KTable<K,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator)Aggregate the values of records in this stream by the grouped key. | 支持 |
<VR> KTable<K,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Aggregate the values of records in this stream by the grouped key. | 支持 |
<VR> KTable<K,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Aggregate the values of records in this stream by the grouped key. | 支持 |
KTable<K,Long> | count()Count the number of records in this stream by the grouped key. | 支持 |
KTable<K,Long> | count(Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Count the number of records in this stream by the grouped key. | 支持 |
KTable<K,Long> | count(Named named)Count the number of records in this stream by the grouped key. | 支持 |
KTable<K,Long> | count(Named named, Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Count the number of records in this stream by the grouped key. | 支持 |
KTable<K,V> | reduce(Reducer<V> reducer)Combine the values of records in this stream by the grouped key. | 支持 |
KTable<K,V> | reduce(Reducer<V> reducer, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Combine the value of records in this stream by the grouped key. | 支持 |
KTable<K,V> | reduce(Reducer<V> reducer, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Combine the value of records in this stream by the grouped key. | 支持 |
SessionWindowedKStream<K,V> | windowedBy(SessionWindows windows)Create a new SessionWindowedKStream (opens new window) instance that can be used to perform session windowed aggregations. | 支持 |
TimeWindowedKStream<K,V> | windowedBy(SlidingWindows windows)Create a new TimeWindowedKStream (opens new window) instance that can be used to perform sliding windowed aggregations | 支持 |
<W extends Window> TimeWindowedKStream<K,V> | windowedBy(Windows<W> windows)Create a new TimeWindowedKStream (opens new window) instance that can be used to perform windowed aggregations. | 支持 |
提示
除了
<VOut> CogroupedKStream<K,VOut>cogroup(Aggregator<? super K,? super V,VOut> aggregator)方法暂未验证外,其它方法都支持。
# TimeWindowedKStream API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
<VR> KTable<Windowed<K>,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator)Aggregate the values of records in this stream by the grouped key and defined windows. | 支持 |
<VR> KTable<Windowed<K>,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Materialized<K,VR,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Aggregate the values of records in this stream by the grouped key and defined windows. | 支持 |
<VR> KTable<Windowed<K>,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Named named)Aggregate the values of records in this stream by the grouped key and defined windows. | 支持 |
<VR> KTable<Windowed<K>,VR> | aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Named named, Materialized<K,VR,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Aggregate the values of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,Long> | count()Count the number of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,Long> | count(Materialized<K,Long,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Count the number of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,Long> | count(Named named)Count the number of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,Long> | count(Named named, Materialized<K,Long,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Count the number of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer)Combine the values of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer, Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Combine the values of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer, Named named)Combine the values of records in this stream by the grouped key and defined windows. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer, Named named, Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Combine the values of records in this stream by the grouped key and defined windows. | 支持 |
# SessionWindowedKStream API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
KTable<Windowed<K>,Long> | count()Count the number of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,Long> | count(Materialized<K,Long,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Count the number of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,Long> | count(Named named)Count the number of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,Long> | count(Named named, Materialized<K,Long,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Count the number of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer)Combine the values of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer, Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Combine the values of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer, Named named)Combine the values of records in this stream by the grouped key and defined sessions. | 支持 |
KTable<Windowed<K>,V> | reduce(Reducer<V> reducer, Named named, Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Combine the values of records in this stream by the grouped key and defined sessions. | 支持 |
提示
除了
<VR> KTable<Windowed<K>,VR>aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Merger<? super K,VR> sessionMerger, ...)系列方法暂未验证外(但从TimeWindowedKStream API支持情况推断,aggregate系列方法应该也支持),其它方法都支持。
# KTable API
.
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
<VR> KTable<K,VR> | mapValues(ValueMapper<? super V,? extends VR> mapper)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. | 支持 |
<VR> KTable<K,VR> | mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance. | 支持 |
KStream<K,V> | toStream()Convert this changelog stream to a KStream (opens new window). | 支持 |
<KR> KStream<KR,V> | toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)Convert this changelog stream to a KStream (opens new window) using the given KeyValueMapper (opens new window) to select the new key. | 支持 |
<KR> KStream<KR,V> | toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)Convert this changelog stream to a KStream (opens new window) using the given KeyValueMapper (opens new window) to select the new key. | 支持 |
KStream<K,V> | toStream(Named named)Convert this changelog stream to a KStream (opens new window). | 支持 |
提示
由于KTable API方法比较多,以上列出的方法只包括提供单元测试和自测的,没列出的方法大多数应该也支持,具体待实际应用中验证。
# Connect API
Connect API允许实现连接器,可以不断地从一些源数据系统拉入Kafka,或者从Kafka推入一些汇聚数据系统。
但是,Connect的许多用户不需要直接使用这个API,他们可以使用预先构建的连接器,而不需要编写任何代码。
Connect API主要包括Connector(SourceConnector、SinkConnector)和Task(SourceTask、SinkTask) API。
# Connector API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
abstract ConfigDef | config()Define the configuration for the connector. | 支持 |
protected ConnectorContext | context()Returns the context object used to interact with the Kafka Connect runtime. | 支持 |
void | initialize(ConnectorContext ctx)Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes. | 支持 |
void | initialize(ConnectorContext ctx, List<Map<String,String>> taskConfigs)Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes and using the provided set of Task configurations. | 支持 |
void | reconfigure(Map<String,String> props)Reconfigure this Connector. | 支持 |
abstract void | start(Map<String,String> props)Start this Connector. | 支持 |
abstract void | stop()Stop this connector. | 支持 |
abstract Class<? extends Task> | taskClass()Returns the Task implementation for this Connector. | 支持 |
abstract List<Map<String,String>> | taskConfigs(int maxTasks)Returns a set of configurations for Tasks based on the current configuration, producing at most count configurations. | 支持 |
Config | validate(Map<String,String> connectorConfigs)Validate the connector configuration values against configuration definitions. | 支持 |
String | version()Get the version of this component. | 支持 |
# SourceConnector API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
protected SourceConnectorContext | context()Returns the context object used to interact with the Kafka Connect runtime. | 支持 |
# SinkConnector API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
protected SinkConnectorContext | context()Returns the context object used to interact with the Kafka Connect runtime. | 支持 |
# Task API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
void | start(Map<String,String> props)Start the Task | 支持 |
void | stop()Stop this task. | 支持 |
String | version()Get the version of this task. | 支持 |
# SourceTask API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
void | commit()Commit the offsets, up to the offsets that have been returned by poll() (opens new window). | 支持 |
void | commitRecord(SourceRecord record)Deprecated. Use commitRecord(SourceRecord, RecordMetadata) (opens new window) instead. | 支持 |
void | commitRecord(SourceRecord record, RecordMetadata metadata)Commit an individual SourceRecord (opens new window) when the callback from the producer client is received. | 支持 |
void | initialize(SourceTaskContext context)Initialize this SourceTask with the specified context object. | 支持 |
abstract List<SourceRecord> | poll()Poll this source task for new records. | 支持 |
abstract void | start(Map<String,String> props)Start the Task. | 支持 |
abstract void | stop()Signal this SourceTask to stop. | 支持 |
# SinkTask API
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
void | close(Collection<TopicPartition> partitions)The SinkTask use this method to close writers for partitions that are no longer assigned to the SinkTask. | 支持 |
void | flush(Map<TopicPartition,OffsetAndMetadata> currentOffsets)Flush all records that have been put(Collection) (opens new window) for the specified topic-partitions. | 支持 |
void | initialize(SinkTaskContext context)Initialize the context of this task. | 支持 |
void | onPartitionsAssigned(Collection<TopicPartition> partitions)Deprecated. Use open(Collection) (opens new window) for partition initialization. | 支持 |
void | onPartitionsRevoked(Collection<TopicPartition> partitions)Deprecated. Use close(Collection) (opens new window) instead for partition cleanup. | 支持 |
void | open(Collection<TopicPartition> partitions)The SinkTask use this method to create writers for newly assigned partitions in case of partition rebalance. | 支持 |
Map<TopicPartition,OffsetAndMetadata> | preCommit(Map<TopicPartition,OffsetAndMetadata> currentOffsets)Pre-commit hook invoked prior to an offset commit. | 支持 |
abstract void | put(Collection<SinkRecord> records)Put the records in the sink. | 支持 |
abstract void | start(Map<String,String> props)Start the Task. | 支持 |
abstract void | stop()Perform any cleanup to stop this task. | 支持 |
# Admin API
Admin API支持管理和检查主题、代理、访问控制列表和其他Kafka对象。
| 修饰符和类型 | 方法和描述 | 是否支持 |
|---|---|---|
default AlterConfigsResult | alterConfigs(Map<ConfigResource,Config> configs)Deprecated. Since 2.3. Use incrementalAlterConfigs(Map) (opens new window). | 支持 |
AlterConfigsResult | alterConfigs(Map<ConfigResource,Config> configs, AlterConfigsOptions options)Deprecated. Since 2.3. Use incrementalAlterConfigs(Map, AlterConfigsOptions) (opens new window). | 支持 |
default void | close()Close the Admin and release all associated resources. | 支持 |
void | close(Duration timeout)Close the Admin client and release all associated resources. | 支持 |
default void | close(long duration, TimeUnit unit)Deprecated. Since 2.2. Use close(Duration) (opens new window) or close() (opens new window). | 支持 |
static Admin | create(Map<String,Object> conf)Create a new Admin with the given configuration. | 支持 |
static Admin | create(Properties props)Create a new Admin with the given configuration. | 支持 |
default CreatePartitionsResult | createPartitions(Map<String,NewPartitions> newPartitions)Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values. | 支持 |
CreatePartitionsResult | createPartitions(Map<String,NewPartitions> newPartitions, CreatePartitionsOptions options)Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values. | 支持 |
default CreateTopicsResult | createTopics(Collection<NewTopic> newTopics)Create a batch of new topics with the default options. | 支持 |
CreateTopicsResult | createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options)Create a batch of new topics. | 支持 |
default DeleteRecordsResult | deleteRecords(Map<TopicPartition,RecordsToDelete> recordsToDelete)Delete records whose offset is smaller than the given offset of the corresponding partition. | 支持 |
DeleteRecordsResult | deleteRecords(Map<TopicPartition,RecordsToDelete> recordsToDelete, DeleteRecordsOptions options)Delete records whose offset is smaller than the given offset of the corresponding partition. | 支持 |
default DeleteTopicsResult | deleteTopics(Collection<String> topics)This is a convenience method for deleteTopics(Collection, DeleteTopicsOptions) (opens new window) with default options. | 支持 |
DeleteTopicsResult | deleteTopics(Collection<String> topics, DeleteTopicsOptions options)Delete a batch of topics. | 支持 |
default DescribeClusterResult | describeCluster()Get information about the nodes in the cluster, using the default options. | 支持 |
DescribeClusterResult | describeCluster(DescribeClusterOptions options)Get information about the nodes in the cluster. | 支持 |
default DescribeConfigsResult | describeConfigs(Collection<ConfigResource> resources)Get the configuration for the specified resources with the default options. | 支持 |
DescribeConfigsResult | describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options)Get the configuration for the specified resources. | 支持 |
default DescribeConsumerGroupsResult | describeConsumerGroups(Collection<String> groupIds)Describe some group IDs in the cluster, with the default options. | 支持 |
DescribeConsumerGroupsResult | describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)Describe some group IDs in the cluster. | 支持 |
default DescribeTopicsResult | describeTopics(Collection<String> topicNames)Describe some topics in the cluster, with the default options. | 支持 |
DescribeTopicsResult | describeTopics(Collection<String> topicNames, DescribeTopicsOptions options)Describe some topics in the cluster. | 支持 |
default AlterConfigsResult | incrementalAlterConfigs(Map<ConfigResource,Collection<AlterConfigOp>> configs)Incrementally updates the configuration for the specified resources with default options. | 不支持 |
AlterConfigsResult | incrementalAlterConfigs(Map<ConfigResource,Collection<AlterConfigOp>> configs, AlterConfigsOptions options)Incrementally update the configuration for the specified resources. | 不支持 |
default ListConsumerGroupOffsetsResult | listConsumerGroupOffsets(String groupId)List the consumer group offsets available in the cluster with the default options. | 支持 |
ListConsumerGroupOffsetsResult | listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)List the consumer group offsets available in the cluster. | 支持 |
default ListTopicsResult | listTopics()List the topics available in the cluster with the default options. | 支持 |
ListTopicsResult | listTopics(ListTopicsOptions options)List the topics available in the cluster. | 支持 |
Map<MetricName,? extends Metric> | metrics()Get the metrics kept by the adminClient | 支持 |
提示
由于Admin API方法比较多,以上列出的方法只包括提供单元测试和自测的,没列出的方法大多数应该也支持,具体待实际应用中验证。