Kafka 事务的一次性语义

事务为发布到 Kafka 的一组消息提供原子性、一致性、隔离性和持久性 (ACID) 的保证。这意味着要么事务中的所有消息都将成功写入 Kafka,要么不会写入任何消息。在确保数据一致性和避免任何数据丢失方面,这是一个至关重要的功能。

Kafka 事务在数据完整性至关重要的场景中尤其重要,例如金融交易和日志系统。此外,使用 Kafka 事务可以帮助确保保留事件的顺序,这对于某些应用程序来说是必要的。

Kafka 的关键特性之一是对事务的支持,它提供了一次性语义 (EOS),并且自 Kafka 0.11 ( KIP-98 ) 起可用。对 EOS 的支持最近扩展到 Kafka Connect 中的源连接器,这支持配置驱动和完全事务性的流管道 ( KIP-618 )。

要充分理解此处介绍的一些概念,需要对日志复制协议的工作原理有一般性的了解。

从 Kafka 3.0 开始,生产者默认启用更强的交付保证(acks=all, enable.idempotence=true)。除了持久性之外,这还提供分区级消息排序和重复保护。

当幂等生产者启用时,代理会为每个新的生产者实例注册一个生产者 ID (PID)。当批次首次添加到生产请求时,会为每个记录分配一个序列号,并且序列号不会更改,即使重新发送批次也是如此。

代理会跟踪每个生产者和分区的序列号,并定期将该信息存储在 .snapshot 文件中。每当有新的批次到达时,代理就会检查接收到的序列号是否等于上次添加的批次序列号加一,如果是,就确认该批次,否则就拒绝该批次。

遗憾的是,当需要将多个分区作为一个工作单元写入时,幂等生产者不能保证在重启时提供重复保护。这是许多流处理应用程序的典型情况,这些应用程序运行读-处理-写循环。在这种情况下,您可以使用 Kafka Producer 或 Kafka Streams API 的事务支持来使这些周期原子化。

生产者Producer API 提供的事务支持是低级的,需要小心使用才能真正获得事务语义。

对于使用 Kafka Streams 的用户来说,这要简单得多:设置 processing.guarantee=exactly_once_v2 就能在现有拓扑上启用 EOS。

每个生产者必须配置自己的静态和唯一的 transactional.id
Transactional.id 用于在进程重启时唯一标识同一个逻辑生产者。与惰性生产者不同的是,事务生产者实例将被分配与具有相同事务.id 的任何先前实例相同的 PID(但生产者时间会递增)。PID 和生产者纪元共同标识一个逻辑生产者会话。

在 Kafka 2.6 之前,transactional.id 必须是输入分区的静态编码,以避免在重新平衡期间应用程序实例之间发生所有权转移,从而使围栏逻辑失效。这种限制导致效率极低,因为应用程序实例无法重复使用单个线程安全生产者实例,而必须为每个输入分区创建一个。通过强制生产者将消费者组元数据与偏移量一起发送给提交者(KIP-447),这一问题得到了解决。

启动时,客户端应用程序会调用 Producer.initTransactions 来初始化生产者会话,允许代理整理与该生产者之前的任何化身相关的状态,这些状态由事务 id 标识。此后,使用相同事务.id 的任何现有生产者都会被视为僵尸,一旦它们尝试发送数据,就会被屏蔽。

为了保证消息的有序性,给定的生产者最多只能有一个正在进行的事务(它们是串行执行的)。中间商会阻止隔离级别=read_committed 的消费者前进到包含开放事务的偏移量。中止的事务中的消息会在消费者内部被过滤掉,客户端应用程序永远不会看到。

需要注意的是,事务仅在单个 Kafka 集群内受支持。如果事务范围包括具有外部可观测副作用的处理器,则需要使用额外的组件。在这种情况下,事务发件箱CDC 模式会有所帮助。


事务元数据
为了实现事务,Kafka 代理需要在日志中包含额外的记账信息。

有关生产者及其事务的信息存储在 __transaction_state 主题中,该主题由称为事务协调器的代理组件使用。每个事务协调器都拥有该主题中分区的一个子集(即其代理是领导者的分区)。

在用户日志中,除了通常的数据记录批次外,事务协调器还会让分区领导者附加控制记录(提交/中止标记),以跟踪哪些批次已实际提交,哪些已回滚。由于控制记录是事务支持的内部实现细节,因此 Kafka 消费者不会向应用程序公开控制记录。

非事务记录被认为是立即决定的,但事务记录只有在写入相应的提交或中止标记时才会决定。

事务会经历以下状态:

  • 未决定(进行中)
  • 已决定且未复制(PrepareCommit|PrepareAbort)
  • 已决定且已复制(CompleteCommit|CompleteAbort)

这是通过一个类似于两阶段提交协议的过程来处理的,其中状态由写入 __transaction_state 主题的一系列控制记录来跟踪。每当一些数据被写入事务中的一个新分区时,我们就会得到正在进行的状态,其中包括所涉及的分区列表。当事务完成时,我们的 PrepareCommit 或 PrepareAbort 状态会发生变化。一旦控制记录写入每个涉及的分区,就会出现 CompleteCommit 或 CompleteAbort 状态变化。

性能
在使用 Kafka 事务时,开发人员应该注意一些缺点。

首先,启用 EOS 会对吞吐量产生影响,这种影响可能很大。每个生产者只能按顺序处理事务,因此你可能需要多个事务生产者,这会对应用程序的其他部分产生连锁反应。这时,框架可以为多个生产者实例提供有用的抽象。

然后,由于确保 EOS 所需的写入放大,事务会给应用程序增加一些延迟。虽然这对某些应用来说可能不是什么大问题,但那些需要实时处理的应用可能会受到影响。

每个事务都会产生一些开销,这些开销与所涉及的报文数量无关。这些开销包括

  • 向协调器注册分区的少量远程过程调用(RPC)(这些远程过程调用可以分批进行,以减少开销)。
  • 事务完成时必须写入每个相关分区的事务标记记录。
  • 附加到内部事务状态日志的一些状态变化记录。

大部分延迟都发生在生产方,即事务 API 的执行方。消费者最多只能获取 LSO,但它必须在内存中缓冲记录,直到观察到提交标记,因此内存使用量会增加。

当事务提交间隔较短时,性能下降可能会很明显。增加事务持续时间也会增加端到端延迟,可能需要进行一些额外的调整,因此需要权衡利弊。

挂起的事务
挂起的事务有一条丢失或失序的控制记录,可能是由于网络问题(KIP-890)导致的信息延迟造成的。这种问题很少见,但必须注意并设置警报,因为其后果会影响服务可用性。

事务协调器会自动中止任何在 transaction.timeout.ms 内未完成的正在进行的事务。这种机制对挂起的事务不起作用,因为从协调者的角度来看,事务已经完成,并写入了事务标记(不再需要超时)。但从分区领导者的角度来看,在提交/中止标记之后还有一个事务的数据记录,这与新事务的开始没有区别。

挂起的事务通常是由滞后时间越来越长的卡死应用程序引起的。尽管有消息产生,但消费者无法取得任何进展,因为 LSO 不再增长。

结论
Kafka 事务提供了确保数据可靠性和完整性的重要手段,使其成为 Kafka 平台的重要功能。然而,这些优势是以吞吐量降低和额外延迟为代价的,这可能需要一些调整。如果不进行监控,挂起的事务可能会对服务可用性产生影响。