Apusic文档中心
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
首页
  • 应用服务器 AAS
  • 负载均衡器 ALB
  • 分布式消息队列 ADMQ
  • 分布式缓存 AMDC
  • 分布式配置中心 ADCC
  • Java开发工具包软件 AJDK
  • 搜索引擎 ASE
  • 中间件云平台 ACP
  • 统一管理平台 AUMP
  • 云原生中间件管理 ACMP
  • DevOps平台 ADOP
  • 许可授权中心 ACLS
  • Copilot智能问答系统 ACS
  • 监控平台 AMP
  • 智能日志 AILP
  • 应用性能管理 AAPM
  • 智能告警 AAlarm
  • 主数据管理 AMDM
  • 数据交换平台 ADXP
  • 企业服务总线 AESB
  • 数据智脑 ADPR
  • 服务治理 ASGP
  • 统一身份管理 AIDM
  • 标准模板
  • Markdown教程 (opens new window)
  • VuePress官方社区 (opens new window)
  • 帮助
贡献文档 (opens new window)
文档中心
  • 金蝶Apusic应用服务器

  • 金蝶Apusic负载均衡器

  • 金蝶Apusic分布式消息队列

    • 产品白皮书
    • 产品更新说明
    • V2.0.6

    • V2.0.6_for_kafka

    • V2.0.5

    • V2.0.4

    • V2.0.3

      • 发版说明
      • 产品简介
      • 功能清单
      • 快速入门
      • 安装部署
      • 开发手册
      • 用户手册
      • 国产化适配
      • 插件说明
      • 性能参数
      • 常见问题
  • 金蝶Apusic分布式缓存

  • 金蝶Apusic分布式配置中心

  • 金蝶Apusic Java开发工具包软件

  • 金蝶Apusic全文检索

插件说明

# 概述

ADMQ通过开启插件支持用户使用原生RocketMQ客户端、RabbitMQ客户端、Kafka客户端接收和发送消息,下面记录使用这些客户端时的功能支持情况。 插件操作请看用户手册

# RocketMQ客户端

RocketMQ是阿里开源的消息队列,详细介绍可参考:https://help.aliyun.com/product/29530.html

功能 是否支持 说明
普通消息 是
异步消息 是
顺序消息 是
消息过滤 否
死信队列 是
消息重试 是
广播消息 是
批量消息 是
事务消息 否
定时消息 是
延迟消息 是
ACL鉴权 是

# 普通消息

可以使用rocketmq客户端来创建消息的生产者、消费者,并进行普通消息的生产与消费。普通消息是指无特性的消息,区别于有特性的定时和延迟消息、顺序消息。

# 代码示例

# 异步消息

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。rocketmq异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

# 代码示例

# 顺序消息

顺序消息是RocketMQ提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 现在只支持分区顺序消息,对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

# 代码示例

# 死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

# 代码示例

# 消息重试

RocketMQ消息收发过程中,若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。

  • 重试间隔:消息消费失败后再次被消息队列RocketMQ版投递给Consumer消费的间隔时间。
  • 最大重试次数:消息消费失败后,可被消息队列RocketMQ版重复投递的最大次数。

# 代码示例

# 广播消息

当使用广播消费模式时,RocketMQ会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

# 代码示例

# 批量消息

如需提高消息的处理效率,或降低下游资源的API调用频率,可以使用批量发送消息。

# 代码示例

# 事务消息

消息队列RocketMQ版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

事务消息:消息队列RocketMQ版提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。

半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了消息队列RocketMQ版服务端,但是消息队列RocketMQ版服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

# 流程介绍

事务消息

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

# 生产消息规则

事务消息发送完成本地事务后,可在execute方法中返回以下三种状态:

  • TransactionStatus.CommitTransaction:提交事务,允许消费者消费该消息。

  • TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。

  • TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后消息队列RocketMQ版服务端根据回查规则向生产者进行消息回查。

  • 通过ONSFactory.createTransactionProducer创建事务消息的Producer时必须指定LocalTransactionChecker的实现类,处理异常情况下事务消息的回查。

  • 回查规则:本地事务执行完成后,若服务端收到的本地事务返回状态为TransactionStatus.Unknow,或生产者应用退出导致本地事务未提交任何状态。则服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

    • 回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。

    • 第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。

      以Java为例,以下设置表示:第一次回查的最快时间为60秒。

      Message message = new Message();
      message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"60");
      
      1
      2

# 消费消息规则

事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列RocketMQ版服务端会根据Group ID去查询生产者客户端。

# 代码示例

# 定时消息

Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。

# 代码示例

# 延迟消息

Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

# 代码示例

# RabbitMQ API

客户端API公开了AMQP 0-9-1协议模型中的关键实体,并提供了额外的抽象以方便使用。

RabbitMQ Java客户端使用com.rabbitmq.client作为顶层包。关键类和接口是:

  1. Java客户端

    • Connection:表示AMQP 0-9-1连接。

    • Channel:表示一个AMQP 0-9-1通道,提供了大部分的操作(协议方法)。

    • Consumer:表示消息消费者。

      Connection用于打开通道,注册连接生命周期事件处理程序,并关闭不再需要的连接。连接通过ConnectionFactory实例化,这是您配置各种连接设置的方式,例如vhost或用户名。协议操作通过通道接口进行。

  2. JMS客户端

    用于RabbitMQ的JMS客户端在RabbitMQ Java客户端之上实现了JMS规范,从而允许新的和现有的JMS应用程序连接到RabbitMQ协议处理器。

    该库支持2.7.0版的JMS 1.1和2.0。插件和JMS客户机应该一起工作和使用。

    • Connection Factory:ConnectionFactory对象封装了一组由管理员定义的连接配置参数。客户机使用它创建与JMS提供程序的连接。

      ConnectionFactory对象是一个JMS管理对象,支持并发使用。

    • Connection:Connection对象是客户机到其JMS提供程序的活动连接。它通常在Java虚拟机(JVM)之外分配提供者资源。

      连接支持并发使用。

    • Session:Session对象是用于生产和消费消息的单线程上下文。尽管它可以在Java虚拟机(JVM)之外分配提供者资源,但它被认为是轻量级JMS对象。

    • Message:Message接口是所有JMS消息的根接口。它定义了用于所有消息的消息头和确认方法。

    • Message Producer:客户端使用MessageProducer对象将消息发送到目的地。通过将Destination对象传递给会话提供的消息生成器创建方法,可以创建MessageProducer对象。

      MessageProducer是所有消息生产者的父接口。

    • Message Consumer:客户端使用MessageConsumer对象从目的地接收消息。通过将Destination对象传递给会话提供的消息使用者创建方法,可以创建MessageConsumer对象。

      MessageConsumer是所有消息使用者的父接口。

    • Destination:Destination对象封装特定于提供程序的地址。

  3. 流Java客户端

    RabbitMQ Stream Java Client是一个与RabbitMQ Stream Plugin通信的Java库。它允许创建和删除流,以及向这些流发布和从这些流消费。

  4. 管理 HTTP API

    RabbitMQ管理插件提供了一个基于HTTP的API,用于管理和监控RabbitMQ节点和集群。

由于目前生产环境大多数使用的是RabbitMQ为3.10.0及以上版本,3.11.0及以上版本生产环境使用并不多,下面API支持情况以RabbitMQ Java Client最稳定的版本5.16.0进行说明。

# Java客户端

# Connection API

API的核心类是Connection,表示AMQP 0-9-1连接。

修饰符和类型 方法和描述 是否支持
void abort()
Abort this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'.
支持
void abort(int timeout)
Abort this connection and all its channels with the [AMQP.REPLY_SUCCESS close code and message 'OK'.
支持
void abort(int closeCode, String closeMessage)
Abort this connection and all its channels.
支持
void abort(int closeCode, String closeMessage, int timeout)
Abort this connection and all its channels.
支持
BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback)
Add a lambda-based BlockedListener.
支持
void addBlockedListener(BlockedListener listener)
Add a BlockedListener.
支持
void clearBlockedListeners()
Remove all BlockedListeners.
支持
void close()
Close this connection and all its channels with the AMQP.REPLY_SUCCESSclose code and message 'OK'.
支持
void close(int timeout)
Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'.
支持
void close(int closeCode, String closeMessage)
Close this connection and all its channels.
支持
void close(int closeCode, String closeMessage, int timeout)
Close this connection and all its channels.
支持
Channel createChannel()
Create a new channel, using an internally allocated channel number.
支持
Channel createChannel(int channelNumber)
Create a new channel, using the specified channel number if possible.
支持
InetAddress getAddress()
Retrieve the host.
支持
int getChannelMax()
Get the negotiated maximum channel number.
支持
Map<String,Object> getClientProperties()
Get a copy of the map of client properties sent to the server
支持
String getClientProvidedName()
Returns client-provided connection name, if any.
支持
ExceptionHandler getExceptionHandler()
Get the exception handler.
支持
int getFrameMax()
Get the negotiated maximum frame size.
支持
int getHeartbeat()
Get the negotiated heartbeat interval.
支持
String getId()
Returns a unique ID for this connection.
支持
int getPort()
Retrieve the port number.
支持
Map<String,Object> getServerProperties()
Retrieve the server properties.
支持
boolean removeBlockedListener(BlockedListener listener)
Remove a BlockedListener.
支持
void setId(String id)
Sets a unique ID for this connection.
支持

# Channel API

核心API类是Channel,表示AMQP 0-9-1通道。

应该避免在线程之间共享Channel实例。应用程序应该每个线程使用一个通道,而不是跨多个线程共享同一个通道。

修饰符和类型 方法和描述 是否支持
void abort()
Abort this channel with the AMQP.REPLY_SUCCESS close code and message 'OK'.
支持
void abort(int closeCode, String closeMessage)
Abort this channel.
支持
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)
Add a lambda-based ConfirmListener.
支持
void addConfirmListener(ConfirmListener listener)
Add a ConfirmListener.
支持
ReturnListener addReturnListener(ReturnCallback returnCallback)
Add a lambda-based ReturnListener.
支持
void addReturnListener(ReturnListener listener)
Add a ReturnListener.
支持
void basicAck(long deliveryTag, boolean multiple)
Acknowledge one or several received messages.
支持
void basicCancel(String consumerTag)
Cancel a consumer.
支持
String basicConsume(String queue, boolean autoAck, Consumer callback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
支持
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
支持
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
支持
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, Consumer callback)
Start a consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
Start a consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
Start a non-nolocal, non-exclusive consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback)
Start a non-nolocal, non-exclusive consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer.
支持
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer.
支持
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, Consumer callback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments.
支持
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments.
支持
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments.
支持
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag and specified arguments.
支持
String basicConsume(String queue, Consumer callback)
Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag.
支持
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag.
支持
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag.
支持
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
Start a non-nolocal, non-exclusive consumer, with explicit acknowledgement and a server-generated consumerTag.
支持
GetResponse basicGet(String queue, boolean autoAck)
Retrieve a message from a queue using AMQP.Basic.Get
支持
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Reject one or several received messages.
支持
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body)
Publish a message.
支持
void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body)
Publish a message.
支持
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
Publish a message.
支持
void basicQos(int prefetchCount)
Request a specific prefetchCount "quality of service" settings for this channel.
支持
void basicQos(int prefetchCount, boolean global)
Request a specific prefetchCount "quality of service" settings for this channel.
支持
void basicQos(int prefetchSize, int prefetchCount, boolean global)
Request specific "quality of service" settings.
支持
AMQP.Basic.RecoverOk basicRecover()
Ask the broker to resend unacknowledged messages.
支持
AMQP.Basic.RecoverOk basicRecover(boolean requeue)
Ask the broker to resend unacknowledged messages.
支持
void basicReject(long deliveryTag, boolean requeue)
Reject a message.
支持
void clearConfirmListeners()
Remove all ConfirmListeners.
支持
void clearReturnListeners()
Remove all ReturnListeners.
支持
void close()
Close this channel with the AMQP.REPLY_SUCCESS close code and message 'OK'.
支持
void close(int closeCode, String closeMessage)
Close this channel.
支持
AMQP.Confirm.SelectOk confirmSelect()
Enables publisher acknowledgements on this channel.
支持
long consumerCount(String queue)
Returns the number of consumers on a queue.
支持
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey)
Bind an exchange to an exchange, with no extra arguments.
支持
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String,Object> arguments)
Bind an exchange to an exchange.
支持
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String,Object> arguments)
Like exchangeBind(String, String, String, java.util.Map)) but sets nowait parameter to true and returns void (as there will be no response from the server).
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type)
Actively declare a non-autodelete, non-durable exchange with no extra arguments
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable)
Actively declare a non-autodelete exchange with no extra arguments
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments)
Declare an exchange, via an interface that allows the complete set of arguments.
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> arguments)
Declare an exchange.
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type)
Actively declare a non-autodelete, non-durable exchange with no extra arguments
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable)
Actively declare a non-autodelete exchange with no extra arguments
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments)
Declare an exchange, via an interface that allows the complete set of arguments.
支持
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String,Object> arguments)
Declare an exchange.
支持
void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments)
Like exchangeDeclare(String, String, boolean, boolean, java.util.Map)) but sets nowait parameter to true and returns nothing (as there will be no response from the server).
支持
void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments)
Like exchangeDeclare(String, String, boolean, boolean, java.util.Map)) but sets nowait parameter to true and returns nothing (as there will be no response from the server).
支持
AMQP.Exchange.DeclareOk exchangeDeclarePassive(String name)
Declare an exchange passively; that is, check if the named exchange exists.
支持
AMQP.Exchange.DeleteOk exchangeDelete(String exchange)
Delete an exchange, without regard for whether it is in use or not
支持
AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused)
Delete an exchange
支持
void exchangeDeleteNoWait(String exchange, boolean ifUnused)
Like exchangeDelete(String, boolean) but sets nowait parameter to true and returns void (as there will be no response from the server).
支持
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey)
Unbind an exchange from an exchange, with no extra arguments.
支持
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String,Object> arguments)
Unbind an exchange from an exchange.
支持
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String,Object> arguments)
Same as exchangeUnbind(String, String, String, java.util.Map)) but sets no-wait parameter to true and returns nothing (as there will be no response from the server).
支持
int getChannelNumber()
Retrieve this channel's channel number.
支持
Connection getConnection()
Retrieve the connection which carries this channel.
支持
Consumer getDefaultConsumer()
Get the current default consumer.
支持
long getNextPublishSeqNo()
When in confirm mode, returns the sequence number of the next message to be published.
支持
long messageCount(String queue)
Returns the number of messages in a queue ready to be delivered to consumers.
支持
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey)
Bind a queue to an exchange, with no extra arguments.
支持
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String,Object> arguments)
Bind a queue to an exchange.
支持
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String,Object> arguments)
Same as queueBind(String, String, String, java.util.Map)) but sets nowait parameter to true and returns void (as there will be no response from the server).
支持
AMQP.Queue.DeclareOk queueDeclare()
Actively declare a server-named exclusive, autodelete, non-durable queue.
支持
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
Declare a queue
支持
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
Like queueDeclare(String, boolean, boolean, boolean, java.util.Map)) but sets nowait flag to true and returns no result (as there will be no response from the server).
支持
AMQP.Queue.DeclareOk queueDeclarePassive(String queue)
Declare a queue passively; i.e., check if it exists.
支持
AMQP.Queue.DeleteOk queueDelete(String queue)
Delete a queue, without regard for whether it is in use or has messages on it
支持
AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
Delete a queue
支持
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty)
Like queueDelete(String, boolean, boolean)) but sets nowait parameter to true and returns nothing (as there will be no response from the server).
支持
AMQP.Queue.PurgeOk queuePurge(String queue)
Purges the contents of the given queue.
支持
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey)
Unbinds a queue from an exchange, with no extra arguments.
支持
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String,Object> arguments)
Unbind a queue from an exchange.
支持
boolean removeConfirmListener(ConfirmListener listener)
Remove a ConfirmListener.
支持
boolean removeReturnListener(ReturnListener listener)
Remove a ReturnListener.
支持
void setDefaultConsumer(Consumer consumer)
Set the current default consumer.
支持
boolean waitForConfirms()
Wait until all messages published since the last call have been either ack'd or nack'd by the broker.
支持
boolean waitForConfirms(long timeout)
Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses.
支持
void waitForConfirmsOrDie()
Wait until all messages published since the last call have been either ack'd or nack'd by the broker.
支持
void waitForConfirmsOrDie(long timeout)
Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses.
支持

提示

由于不支持prefetchSize,调用basicQos(int prefetchSize, int prefetchCount, boolean global)prefetchSize不能传大于0的参数,否则会报"prefetchSize not supported "错误。

# Consumer API

应用程序回调对象通过订阅从队列接收通知和消息的接口。大多数实现都将子类化DefaultConsumer。

此接口的方法在调度线程中调用,该调度线程与连接的线程是分离的。这允许消费者调用通道或连接方法而不会导致死锁。

修饰符和类型 方法和描述 是否支持
void handleCancel(String consumerTag)
Called when the consumer is cancelled for reasons other than by a call to Channel.basicCancel(java.lang.String)).
支持
void handleCancelOk(String consumerTag)
Called when the consumer is cancelled by a call to Channel.basicCancel(java.lang.String)).
支持
void handleConsumeOk(String consumerTag)
Called when the consumer is registered by a call to any of the Channel.basicConsume(java.lang.String, com.rabbitmq.client.Consumer)) methods.
支持
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
Called when a **basic.deliver** is received for this consumer.
支持
void handleRecoverOk(String consumerTag)
Called when a **basic.recover-ok** is received in reply to a **basic.recover**.
支持
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig)
Called when either the channel or the underlying connection has been shut down.
支持

# JMS客户端

不支持。

# 流Java客户端

不支持。

# 管理HTTP API

不支持。

# Kafka APIS

Kafka包括五个核心api:

  1. Producer API允许应用程序向Kafka集群中的主题发送数据流。

  2. Consumer API允许应用程序从Kafka集群中的主题读取数据流。

  3. Streams API允许将数据流从输入主题转换为输出主题。

  4. Connect API允许实现连接器,这些连接器可以不断地从一些源系统或应用程序拉入Kafka,或从Kafka推入一些接收器系统或应用程序。

  5. Admin API允许管理和检查主题、代理和其他Kafka对象。

    由于目前生产环境大多数使用的是Kafka为2.0.0及以上版本,3.0.0及以上版本生产环境使用并不多,下面APIS支持情况以Kafka 2.0.0最稳定的版本2.7.2进行说明。

# Producer API

一个Kafka客户端,它向Kafka集群发布记录。

生产者是线程安全的,跨线程共享一个生产者实例通常比拥有多个实例更快。

修饰符和类型 方法和描述 是否支持
void abortTransaction()
Aborts the ongoing transaction.
支持
void beginTransaction()
Should be called before the start of each new transaction.
支持
void close()
Close this producer.
支持
void close(Duration timeout)
This method waits up to timeout for the producer to complete the sending of all incomplete requests.
支持
void commitTransaction()
Commits the ongoing transaction.
支持
void flush()
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.
支持
void initTransactions()
Needs to be called before any other methods when the transactional.id is set in the configuration.
支持
Map<MetricName,? extends Metric> metrics()
Get the full set of internal metrics maintained by the producer.
支持
List<PartitionInfo> partitionsFor(String topic)
Get the partition metadata for the given topic.
支持
Future<RecordMetadata> send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic.
支持
Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
支持
void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata)
Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction.
支持
void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, String consumerGroupId)
Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction.
支持

# Consumer API

消费Kafka集群记录的客户端。这个客户端透明地处理Kafka代理的失败,并透明地适应它获取的主题分区在集群内的迁移。此客户机还与代理交互,以允许消费者组使用消费者组来负载平衡消费。

消费者维护必要的代理的TCP连接以获取数据。使用后关闭消费者失败将会释放这些连接。消费者不是线程安全的。

修饰符和类型 方法和描述 是否支持
void assign(Collection<TopicPartition> partitions)
Manually assign a list of partitions to this consumer.
支持
Set<TopicPartition> assignment()
Get the set of partitions currently assigned to this consumer.
支持
Map<TopicPartition,Long> beginningOffsets(Collection<TopicPartition> partitions)
Get the first offset for the given partitions.
支持
Map<TopicPartition,Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)
Get the first offset for the given partitions.
支持
void close()
Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
支持
void close(Duration timeout)
Tries to close the consumer cleanly within the specified timeout.
支持
void close(long timeout, TimeUnit timeUnit)
Deprecated. Since 2.0. Use close(Duration) (opens new window) or close() (opens new window).
支持
void commitAsync()
Commit offsets returned on the last poll(Duration) (opens new window) for all the subscribed list of topics and partition.
支持
void commitAsync(Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
Commit the specified offsets for the specified list of topics and partitions to Kafka.
支持
void commitAsync(OffsetCommitCallback callback)
Commit offsets returned on the last poll() (opens new window) for the subscribed list of topics and partitions.
支持
void commitSync()
Commit offsets returned on the last poll() (opens new window) for all the subscribed list of topics and partitions.
支持
void commitSync(Duration timeout)
Commit offsets returned on the last poll() (opens new window) for all the subscribed list of topics and partitions.
支持
void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)
Commit the specified offsets for the specified list of topics and partitions.
支持
void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets, Duration timeout)
Commit the specified offsets for the specified list of topics and partitions.
支持
Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)
Get the last committed offsets for the given partitions (whether the commit happened by this process or another).
支持
Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout)
Get the last committed offsets for the given partitions (whether the commit happened by this process or another).
支持
OffsetAndMetadata committed(TopicPartition partition)
Deprecated. since 2.4 Use committed(Set) (opens new window) instead
支持
OffsetAndMetadata committed(TopicPartition partition, Duration timeout)
Deprecated. since 2.4 Use committed(Set, Duration) (opens new window) instead
支持
Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions)
Get the end offsets for the given partitions.
支持
Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)
Get the end offsets for the given partitions.
支持
void enforceRebalance()
Alert the consumer to trigger a new rebalance by rejoining the group.
支持
ConsumerGroupMetadata groupMetadata()
Return the current group metadata associated with this consumer.
支持
Map<String,List<PartitionInfo>> listTopics()
Get metadata about partitions for all topics that the user is authorized to view.
支持
Map<String,List<PartitionInfo>> listTopics(Duration timeout)
Get metadata about partitions for all topics that the user is authorized to view.
支持
Map<MetricName,? extends Metric> metrics()
Get the metrics kept by the consumer
支持
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
Look up the offsets for the given partitions by timestamp.
支持
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch, Duration timeout)
Look up the offsets for the given partitions by timestamp.
支持
List<PartitionInfo> partitionsFor(String topic)
Get metadata about the partitions for a given topic.
支持
List<PartitionInfo> partitionsFor(String topic, Duration timeout)
Get metadata about the partitions for a given topic.
支持
void pause(Collection<TopicPartition> partitions)
Suspend fetching from the requested partitions.
支持
Set<TopicPartition> paused()
Get the set of partitions that were previously paused by a call to pause(Collection) (opens new window).
支持
ConsumerRecords<K,V> poll(Duration timeout)
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.
支持
ConsumerRecords<K,V> poll(long timeoutMs)
Deprecated. Since 2.0. Use poll(Duration) (opens new window), which does not block beyond the timeout awaiting partition assignment. See KIP-266 (opens new window) for more information.
支持
long position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
支持
long position(TopicPartition partition, Duration timeout)
Get the offset of the next record that will be fetched (if a record with that offset exists).
支持
void resume(Collection<TopicPartition> partitions)
Resume specified partitions which have been paused with pause(Collection) (opens new window).
支持
void seek(TopicPartition partition, long offset)
Overrides the fetch offsets that the consumer will use on the next poll(timeout) (opens new window).
支持
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
Overrides the fetch offsets that the consumer will use on the next poll(timeout) (opens new window).
支持
void seekToBeginning(Collection<TopicPartition> partitions)
Seek to the first offset for each of the given partitions.
支持
void seekToEnd(Collection<TopicPartition> partitions)
Seek to the last offset for each of the given partitions.
支持
void subscribe(Collection<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.
支持
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
Subscribe to the given list of topics to get dynamically assigned partitions.
支持
void subscribe(Pattern pattern)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
支持
void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
支持
Set<String> subscription()
Get the current subscription.
支持
void unsubscribe()
Unsubscribe from topics currently subscribed with subscribe(Collection) (opens new window) or subscribe(Pattern) (opens new window).
支持
void wakeup()
Wakeup the consumer.
支持

# Streams API

一个Kafka客户端,允许对来自一个或多个输入主题的输入执行连续计算,并将输出发送到零个、一个或多个输出主题。计算逻辑可以通过使用拓扑来定义处理器的DAG拓扑,或者通过使用提供高级DSL来定义转换的StreamsBuilder来指定。

一个KafkaStreams实例可以包含一个或多个在配置中指定的线程处理工作。一个KafkaStreams实例可以作为一个单独的(可能是分布式的)流处理应用程序与具有相同应用程序ID的任何其他实例(无论是在同一进程中,在本机的其他进程上,还是在远程机器上)进行协调。这些实例将根据输入主题分区的分配来划分工作,以便使用所有分区。如果实例被添加或失败,所有(剩余的)实例将在它们之间重新平衡分区分配,以平衡处理负载并确保处理所有输入主题分区。

在内部,KafkaStreams实例包含一个普通的KafkaProducer和KafkaConsumer实例,用于读取输入和写入输出。

Streams API主要包括KSteam(KGroupedStream、TimeWindowedKStream 、SessionWindowedKStream )、KTable等API。

# KStream API

修饰符和类型 方法和描述 是否支持
<KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
Set a new key (with possibly new type) for each input record.
支持
<KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)
Set a new key (with possibly new type) for each input record.
支持
<VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
Transform the value of each input record into a new value (with possible new type) of the output record.
支持
<VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
Transform the value of each input record into a new value (with possible new type) of the output record.
支持
<VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Transform the value of each input record into a new value (with possible new type) of the output record.
支持
<VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
Transform the value of each input record into a new value (with possible new type) of the output record.
支持
<VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper)
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
支持
<VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper, Named named)
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
支持
<VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper)
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
支持
<VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper, Named named)
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
支持
void foreach(ForeachAction<? super K,? super V> action)
Perform an action on each record of KStream.
支持
void foreach(ForeachAction<? super K,? super V> action, Named named)
Perform an action on each record of KStream.
支持
void to(String topic)
Materialize this stream to a topic using default serializers specified in the config and producer's DefaultPartitioner.
支持
void to(String topic, Produced<K,V> produced)
Materialize this stream to a topic using the provided Produced (opens new window) instance.
支持
void to(TopicNameExtractor<K,V> topicExtractor)
Dynamically materialize this stream to topics using default serializers specified in the config and producer's DefaultPartitioner.
支持
void to(TopicNameExtractor<K,V> topicExtractor, Produced<K,V> produced)
Dynamically materialize this stream to topics using the provided Produced (opens new window) instance.
支持
<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector)
Group the records of this KStream on a new key that is selected using the provided KeyValueMapper (opens new window) and default serializers and deserializers.
支持
<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Grouped<KR,V> grouped)
Group the records of this KStream on a new key that is selected using the provided KeyValueMapper (opens new window) and Serde (opens new window)s as specified by Grouped (opens new window).
支持
<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Serialized<KR,V> serialized)
Deprecated. since 2.1. Use groupBy(KeyValueMapper, Grouped) (opens new window) instead
支持
KGroupedStream<K,V> groupByKey()
Group the records by their current key into a KGroupedStream (opens new window) while preserving the original values and default serializers and deserializers.
支持
KGroupedStream<K,V> groupByKey(Grouped<K,V> grouped)
Group the records by their current key into a KGroupedStream (opens new window) while preserving the original values and using the serializers as defined by Grouped (opens new window).
支持
KGroupedStream<K,V> groupByKey(Serialized<K,V> serialized)
Deprecated. since 2.1. Use groupByKey(Grouped) (opens new window) instead
支持
<GK,GV,RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner)
Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed inner equi join.
支持
<GK,GV,RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner, Named named)
Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed inner equi join.
支持
<GK,GV,RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)
Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed left equi join.
支持
<GK,GV,RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner, Named named)
Join records of this stream with GlobalKTable (opens new window)'s records using non-windowed left equi join.
支持
<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, Named named, String... stateStoreNames)
Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily).
支持
<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)
Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily).
支持
<K1,V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, Named named, String... stateStoreNames)
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).
支持
<K1,V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, String... stateStoreNames)
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).
支持

提示

由于KStream API方法比较多,以上列出的方法只包括提供单元测试和自测的,没列出的方法大多数应该也支持,具体待实际应用中验证。

# KGroupedStream API

修饰符和类型 方法和描述 是否支持
<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator)
Aggregate the values of records in this stream by the grouped key.
支持
<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Aggregate the values of records in this stream by the grouped key.
支持
<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Aggregate the values of records in this stream by the grouped key.
支持
KTable<K,Long> count()
Count the number of records in this stream by the grouped key.
支持
KTable<K,Long> count(Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Count the number of records in this stream by the grouped key.
支持
KTable<K,Long> count(Named named)
Count the number of records in this stream by the grouped key.
支持
KTable<K,Long> count(Named named, Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Count the number of records in this stream by the grouped key.
支持
KTable<K,V> reduce(Reducer<V> reducer)
Combine the values of records in this stream by the grouped key.
支持
KTable<K,V> reduce(Reducer<V> reducer, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Combine the value of records in this stream by the grouped key.
支持
KTable<K,V> reduce(Reducer<V> reducer, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Combine the value of records in this stream by the grouped key.
支持
SessionWindowedKStream<K,V> windowedBy(SessionWindows windows)
Create a new SessionWindowedKStream (opens new window) instance that can be used to perform session windowed aggregations.
支持
TimeWindowedKStream<K,V> windowedBy(SlidingWindows windows)
Create a new TimeWindowedKStream (opens new window) instance that can be used to perform sliding windowed aggregations
支持
<W extends Window> TimeWindowedKStream<K,V> windowedBy(Windows<W> windows)
Create a new TimeWindowedKStream (opens new window) instance that can be used to perform windowed aggregations.
支持

提示

除了<VOut> CogroupedKStream<K,VOut> cogroup(Aggregator<? super K,? super V,VOut> aggregator)方法暂未验证外,其它方法都支持。

# TimeWindowedKStream API

修饰符和类型 方法和描述 是否支持
<VR> KTable<Windowed<K>,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator)
Aggregate the values of records in this stream by the grouped key and defined windows.
支持
<VR> KTable<Windowed<K>,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Materialized<K,VR,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Aggregate the values of records in this stream by the grouped key and defined windows.
支持
<VR> KTable<Windowed<K>,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Named named)
Aggregate the values of records in this stream by the grouped key and defined windows.
支持
<VR> KTable<Windowed<K>,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Named named, Materialized<K,VR,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Aggregate the values of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,Long> count()
Count the number of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,Long> count(Materialized<K,Long,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Count the number of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,Long> count(Named named)
Count the number of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,Long> count(Named named, Materialized<K,Long,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Count the number of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer)
Combine the values of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer, Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Combine the values of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer, Named named)
Combine the values of records in this stream by the grouped key and defined windows.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer, Named named, Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Combine the values of records in this stream by the grouped key and defined windows.
支持

# SessionWindowedKStream API

修饰符和类型 方法和描述 是否支持
KTable<Windowed<K>,Long> count()
Count the number of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,Long> count(Materialized<K,Long,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Count the number of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,Long> count(Named named)
Count the number of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,Long> count(Named named, Materialized<K,Long,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Count the number of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer)
Combine the values of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer, Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Combine the values of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer, Named named)
Combine the values of records in this stream by the grouped key and defined sessions.
支持
KTable<Windowed<K>,V> reduce(Reducer<V> reducer, Named named, Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Combine the values of records in this stream by the grouped key and defined sessions.
支持

提示

除了<VR> KTable<Windowed<K>,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator, Merger<? super K,VR> sessionMerger, ...)系列方法暂未验证外(但从TimeWindowedKStream API支持情况推断,aggregate系列方法应该也支持),其它方法都支持。

# KTable API

.

修饰符和类型 方法和描述 是否支持
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store.
支持
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance.
支持
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store.
支持
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance.
支持
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store.
支持
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance.
支持
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store.
支持
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde (opens new window), value serde (opens new window), and the underlying materialized state storage (opens new window) configured in the Materialized (opens new window) instance.
支持
KStream<K,V> toStream()
Convert this changelog stream to a KStream (opens new window).
支持
<KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)
Convert this changelog stream to a KStream (opens new window) using the given KeyValueMapper (opens new window) to select the new key.
支持
<KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)
Convert this changelog stream to a KStream (opens new window) using the given KeyValueMapper (opens new window) to select the new key.
支持
KStream<K,V> toStream(Named named)
Convert this changelog stream to a KStream (opens new window).
支持

提示

由于KTable API方法比较多,以上列出的方法只包括提供单元测试和自测的,没列出的方法大多数应该也支持,具体待实际应用中验证。

# Connect API

Connect API允许实现连接器,可以不断地从一些源数据系统拉入Kafka,或者从Kafka推入一些汇聚数据系统。

但是,Connect的许多用户不需要直接使用这个API,他们可以使用预先构建的连接器,而不需要编写任何代码。

Connect API主要包括Connector(SourceConnector、SinkConnector)和Task(SourceTask、SinkTask) API。

# Connector API

修饰符和类型 方法和描述 是否支持
abstract ConfigDef config()
Define the configuration for the connector.
支持
protected ConnectorContext context()
Returns the context object used to interact with the Kafka Connect runtime.
支持
void initialize(ConnectorContext ctx)
Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes.
支持
void initialize(ConnectorContext ctx, List<Map<String,String>> taskConfigs)
Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes and using the provided set of Task configurations.
支持
void reconfigure(Map<String,String> props)
Reconfigure this Connector.
支持
abstract void start(Map<String,String> props)
Start this Connector.
支持
abstract void stop()
Stop this connector.
支持
abstract Class<? extends Task> taskClass()
Returns the Task implementation for this Connector.
支持
abstract List<Map<String,String>> taskConfigs(int maxTasks)
Returns a set of configurations for Tasks based on the current configuration, producing at most count configurations.
支持
Config validate(Map<String,String> connectorConfigs)
Validate the connector configuration values against configuration definitions.
支持
String version()
Get the version of this component.
支持
# SourceConnector API
修饰符和类型 方法和描述 是否支持
protected SourceConnectorContext context()
Returns the context object used to interact with the Kafka Connect runtime.
支持
# SinkConnector API
修饰符和类型 方法和描述 是否支持
protected SinkConnectorContext context()
Returns the context object used to interact with the Kafka Connect runtime.
支持

# Task API

修饰符和类型 方法和描述 是否支持
void start(Map<String,String> props)
Start the Task
支持
void stop()
Stop this task.
支持
String version()
Get the version of this task.
支持
# SourceTask API
修饰符和类型 方法和描述 是否支持
void commit()
Commit the offsets, up to the offsets that have been returned by poll() (opens new window).
支持
void commitRecord(SourceRecord record)
Deprecated. Use commitRecord(SourceRecord, RecordMetadata) (opens new window) instead.
支持
void commitRecord(SourceRecord record, RecordMetadata metadata)
Commit an individual SourceRecord (opens new window) when the callback from the producer client is received.
支持
void initialize(SourceTaskContext context)
Initialize this SourceTask with the specified context object.
支持
abstract List<SourceRecord> poll()
Poll this source task for new records.
支持
abstract void start(Map<String,String> props)
Start the Task.
支持
abstract void stop()
Signal this SourceTask to stop.
支持
# SinkTask API
修饰符和类型 方法和描述 是否支持
void close(Collection<TopicPartition> partitions)
The SinkTask use this method to close writers for partitions that are no longer assigned to the SinkTask.
支持
void flush(Map<TopicPartition,OffsetAndMetadata> currentOffsets)
Flush all records that have been put(Collection) (opens new window) for the specified topic-partitions.
支持
void initialize(SinkTaskContext context)
Initialize the context of this task.
支持
void onPartitionsAssigned(Collection<TopicPartition> partitions)
Deprecated. Use open(Collection) (opens new window) for partition initialization.
支持
void onPartitionsRevoked(Collection<TopicPartition> partitions)
Deprecated. Use close(Collection) (opens new window) instead for partition cleanup.
支持
void open(Collection<TopicPartition> partitions)
The SinkTask use this method to create writers for newly assigned partitions in case of partition rebalance.
支持
Map<TopicPartition,OffsetAndMetadata> preCommit(Map<TopicPartition,OffsetAndMetadata> currentOffsets)
Pre-commit hook invoked prior to an offset commit.
支持
abstract void put(Collection<SinkRecord> records)
Put the records in the sink.
支持
abstract void start(Map<String,String> props)
Start the Task.
支持
abstract void stop()
Perform any cleanup to stop this task.
支持

# Admin API

Admin API支持管理和检查主题、代理、访问控制列表和其他Kafka对象。

修饰符和类型 方法和描述 是否支持
default AlterConfigsResult alterConfigs(Map<ConfigResource,Config> configs)
Deprecated. Since 2.3. Use incrementalAlterConfigs(Map) (opens new window).
支持
AlterConfigsResult alterConfigs(Map<ConfigResource,Config> configs, AlterConfigsOptions options)
Deprecated. Since 2.3. Use incrementalAlterConfigs(Map, AlterConfigsOptions) (opens new window).
支持
default void close()
Close the Admin and release all associated resources.
支持
void close(Duration timeout)
Close the Admin client and release all associated resources.
支持
default void close(long duration, TimeUnit unit)
Deprecated. Since 2.2. Use close(Duration) (opens new window) or close() (opens new window).
支持
static Admin create(Map<String,Object> conf)
Create a new Admin with the given configuration.
支持
static Admin create(Properties props)
Create a new Admin with the given configuration.
支持
default CreatePartitionsResult createPartitions(Map<String,NewPartitions> newPartitions)
Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values.
支持
CreatePartitionsResult createPartitions(Map<String,NewPartitions> newPartitions, CreatePartitionsOptions options)
Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values.
支持
default CreateTopicsResult createTopics(Collection<NewTopic> newTopics)
Create a batch of new topics with the default options.
支持
CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options)
Create a batch of new topics.
支持
default DeleteRecordsResult deleteRecords(Map<TopicPartition,RecordsToDelete> recordsToDelete)
Delete records whose offset is smaller than the given offset of the corresponding partition.
支持
DeleteRecordsResult deleteRecords(Map<TopicPartition,RecordsToDelete> recordsToDelete, DeleteRecordsOptions options)
Delete records whose offset is smaller than the given offset of the corresponding partition.
支持
default DeleteTopicsResult deleteTopics(Collection<String> topics)
This is a convenience method for deleteTopics(Collection, DeleteTopicsOptions) (opens new window) with default options.
支持
DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options)
Delete a batch of topics.
支持
default DescribeClusterResult describeCluster()
Get information about the nodes in the cluster, using the default options.
支持
DescribeClusterResult describeCluster(DescribeClusterOptions options)
Get information about the nodes in the cluster.
支持
default DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources)
Get the configuration for the specified resources with the default options.
支持
DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options)
Get the configuration for the specified resources.
支持
default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds)
Describe some group IDs in the cluster, with the default options.
支持
DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)
Describe some group IDs in the cluster.
支持
default DescribeTopicsResult describeTopics(Collection<String> topicNames)
Describe some topics in the cluster, with the default options.
支持
DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options)
Describe some topics in the cluster.
支持
default AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,Collection<AlterConfigOp>> configs)
Incrementally updates the configuration for the specified resources with default options.
不支持
AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,Collection<AlterConfigOp>> configs, AlterConfigsOptions options)
Incrementally update the configuration for the specified resources.
不支持
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId)
List the consumer group offsets available in the cluster with the default options.
支持
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)
List the consumer group offsets available in the cluster.
支持
default ListTopicsResult listTopics()
List the topics available in the cluster with the default options.
支持
ListTopicsResult listTopics(ListTopicsOptions options)
List the topics available in the cluster.
支持
Map<MetricName,? extends Metric> metrics()
Get the metrics kept by the adminClient
支持

提示

由于Admin API方法比较多,以上列出的方法只包括提供单元测试和自测的,没列出的方法大多数应该也支持,具体待实际应用中验证。

编辑页面 (opens new window)
#插件

← 国产化适配 性能参数→

  • 浅色模式