深入理解kafka-核心技术与实战篇【干的要命系列】(二)
书接上文 深入理解kafka-核心技术与实战篇【干的要命系列】(一),虽迟但到😏
学习路线及大纲:
5. 深入Kafka原理
5.1 Kafka的副本机制
所谓的副本机制,可以称之为备份机制,他有以下各种好处:
- 提供数据的冗余。即使系统的部分失效,系统仍然可以运转。
- 提供高伸缩性。支持横向扩展,能通过增加机器的方式提升读性能,进而太高读操作的吞吐量。
- 改善数据局限性。将数据放入与用户地理位置相近的地方,从而降低系统延迟
5.1.1 副本定义
所谓副本(replica),本质就是一个只能追加写消息的提交日志。从而能够应对部分Broker宕机带来的数据不可用。
在实际生产环境中,每台Broker都可能保存各个主题下不同分区的不同副本,因此单个Broker上存在成百上千的副本很正常。
5.1.2 副本角色
分区下有多个副本,这些副本内容要如何保持一致呢?Kafka采用**基于领导者(Leader-base)**的副本机制,副本分为Leader和Follower两种角色,他们是这样分工的:领导者可以读写,但是追随者不提供服务。
5.1.3 In-sync Replicas(ISR)
Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
追随者定时异步拉取领导者的数据,会有不能实时同步的风险,Kafka对同步有自己的定义标准, Broker 端参数 replica.lag.time.max.ms 参数值,这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
5.1.4 Unclean 领导者选举
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
其实这种方式是提高了系统的可用性,但是降低数据的一致性,CAP理论的又一个案例!
5.2 处理请求
之前说过Kafka的所有请求都是TCP网络以Socket的方式进行通讯的。
处理请求的两种常见方式:
- 顺序处理请求。写对应的伪代码,大概是这个样子的:
1 |
|
这种方式比较简单,但是有个致命的缺点就是,吞吐量太差。只能顺序处理每个请求,每个请求都是one-by-one。
- 每个请求使用单独线程处理
1 |
|
这种方式完全就是异步的,但是缺陷也很明显,每次都是创建线程,在某些场景下会压垮整个服务。
那么Kafka是怎么处理请求的呢?
答案是使用Reactor模式,Reactor模式是事件驱动模式架构的一种实现方式,特变适合应用于处理多个客户端并发向服务器端发送请求的场景。
kafka类似画一张图的话:
当网络线程拿到请求后,不是自己处理,而是将请求放到一个共享队列中。Broker端还有一个IO线程池,负责从该队列中取出请求,执行真正的处理。如果是produce生产消息,则将消息写入底层磁盘日志,如果是fetch请求,则从磁盘或者页缓存中读取消息。
图中有一个叫 Purgatory 的组件,这是 Kafka 中著名的“炼狱”组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。
5.3 消费者重平衡流程解析
重平衡要用到KafkaBroker端的Coordinator组件,在Coordinator的帮助下完成整个消费组的分区重分配。
触发和通知
三个触发条件:
- 组成员数量发生了改变
- 订阅主题数量发生了改变
- 订阅主题分区数发生了改变
在实际生成总,因为1条件发生的重平衡是最常见的。也就是说,每次消费组启动的时候,必然会导致重平衡。
那么重平衡是如何通知到其他消费实例的呢?答案是:考消费者端的心跳线程(HeartBeat Thread)。
消费者组状态机
重平衡一旦开启,Broker端的协同者组件就要开始工作,主要是控制消费者组的状态流转。Kafka设计了一套消费者状态机,来帮助协调者完成整个重平衡流程。
目前定义了5种状态,他们分别是:Empty、Dead、PreparingBalance、CompletingRebalance和Stable。
状态 | 含义 |
---|---|
Empty | 组内没有任何成员,但是消费者组可能存在已提交的位移数据,而且这些位移尚未过期 |
Dead | 同样是组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者组件保存着当前向它注册过的所有组信息,所谓元数据信息就类似于这个注册信息 |
PreparingRebalance | 消费者准备开启重平衡。此时所有成员都要重新请求加入消费者组。 |
CompletingRebalance | 消费者组下所有成员已经加入,各个成员正在等待分配方案。该状态在老一点的版本中被称为AwaitingSync,它和CompletingRebalance是等价的。 |
Stable | 消费者组的稳定状态。该状态表明重平衡已经完成,组内各成员能工正常消费数据了。 |
消费者端重平衡流程
在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
Broker端重平衡场景解析
分以下几个场景。分别是新成员加入组、组成员离开组、组成员提交位移。
场景一:新成员入组
新成员入组是指在组状态为Stable状态后,有新的成员加入。如果是全新启动一个消费者组,Kafka是有一些自己的小优化的,流程上会有些不同。
当协调者JoinGroup请求后,他会通过心跳请求响应的方式通知组内现有成员,强制开一一轮重平衡。
场景二:组成员主动离组
在消费者实例所在的线程或者进程调用close方法主动通知协调者它要退出或者调用close()方法主动通知协调者它退出,
场景三:组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。
场景四:重平衡时协调者对组内成员提交位移的处理
当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送
5.4 Kafka的控制器组件
控制器组件是Kafka的核心组件。它的作用是在Zookeeper的帮助下管理和协调整个Kafka集群。
集群中的任意一台Broker都能充当控制器的角色。但是,在实际运行中,只能有一个Broker称为控制器,行使其管理和协调的职责。
控制器是做什么的?
主题管理
分区重分配
PreFerred领导者选举
集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
控制器保存数据
5.5 高水平和Leader Epoch
Kafka的世界中,水位不是时间戳,更与时间无关。它是和位置绑定的,具体来说,他是用消息位移来表征的。
高水位线的作用
主要的作用有两个:
- 定义消息的可见性,即用来标识分区下的哪些消息是可以被消费的
- 帮助Kafka完成副本同步
Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO。
Leader Epoch 登场
Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
- Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
我们来看一个实际的例子,它展示的是 Leader Epoch 是如何防止数据丢失的:
Leader Epoch 机制来规避这种数据丢失:
副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
6. Kafka 管理与监控
TODO: 暂时不用,后续补充
7. 高级Kafka应用之流式处理
6.1 Kafka Streams 是什么?
Kafka Streams 是一个 Java 库(也支持 Scala),用于构建 分布式、容错、可扩展的流式应用,你只需要在你的 Java 应用中引入依赖,就可以:
- 消费 Kafka 的消息(支持高吞吐量)
- 做窗口计算、聚合、join、分组、过滤等操作
- 处理完后再写回 Kafka(或其它 sink)
在 Kafka Streams 中,流处理逻辑是用拓扑来表征的。一个拓扑结构本质上是一个有向无环图(DAG),它由多个处理节点(Node)和连接节点的多条边组成,如下图所示:
图上的节点也称之为处理单元或Processor,它封装了具体的事件处理逻辑。Processor在其他平台也被称为操作算子。常见的操作算子包括转化(map)、过滤(filter)、链接(join)和聚合(aggregation)等。
大体上,Kafka Streams 开放了两大类 API 供你定义 Processor 逻辑。
第 1 类就是 DSL,它是声明式的函数式 API,使用起来感觉和 SQL 类似,你不用操心它的底层是怎么实现的,你只需要调用特定的 API 告诉 Kafka Streams 你要做什么即可。
1 |
|
第 2 类则是命令式的低阶 API,称为 Processor API。比起 DSL,这组 API 提供的实现方式更加灵活。你可以编写自定义的算子来实现一些 DSL 天然没有提供的处理逻辑。事实上,DSL 底层也是用 Processor API 实现的。
不论是用哪组 API 实现,所有流处理应用本质上都可以分为两类:有状态的(Stateful)应用和无状态的(Stateless)应用。
有状态的应用指的是应用中使用了类似于连接、聚合或时间窗口(Window)的 API。
无状态的应用是指在这类应用中,某条消息的处理结果不会影响或依赖其他消息的处理。常见的无状态操作包括事件转换以及刚刚那个例子中的过滤等。
流表二元性
流就是一个永不停止(至少理论上是这样的)的事件序列,而表和关系型数据库中的概念类似,是一组行记录。
在流处理领域,两者是有机统一的:流在时间维度上聚合之后形成表,表在时间维度上不断更新形成流,这就是所谓的流表二元性(Duality of Streams and Tables)
流和表的概念在流处理领域非常关键。在 Kafka Streams DSL 中,流用 KStream 表示,而表用 KTable 表示。
时间
在流处理领域内,精确定义事件时间是非常关键的:一方面,它是决定流处理应用能否实现正确性的前提;另一方面,流处理中时间窗口等操作依赖于时间概念才能正常工作。
常见的时间概念有两类:事件发生时间(Event Time)和事件处理时间(Processing Time)。理想情况下,我们希望这两个时间相等,即事件一旦发生就马上被处理,但在实际场景中,这是不可能的,Processing Time 永远滞后于 Event Time
如果流处理应用要实现结果的正确性,就必须要使用基于 Event Time 的时间窗口,而不能使用基于 Processing Time 的时间窗口。
时间窗口
所谓的时间窗口机制,就是将流数据沿着时间线切分的过程。常见的时间窗口包括:
- 固定时间窗口(Fixed Windows)
- 滑动时间窗口(Sliding Windows)
- 会话窗口(Session Windows)
eg运行一个WordCont实例
1 |
|
6.2 Kafka Streams在金融领域的应用
简单的例子。假设有一个金融理财用户张三,他首先在苹果手机上访问了某理财产品,然后在安卓手机上注册了该理财产品的账号,最后在电脑上登录该账号,并购买了该理财产品。ID Mapping 就是要将这些不同端或设备上的用户信息聚合起来,然后找出并打通用户所关联的所有 ID 信息。
何使用 Kafka Streams 来实现一个特定场景下的实时 ID Mapping。为了方便理解,我们假设 ID Mapping 只关心身份证号、手机号以及设备 ID。下面是用 Avro 写成的 Schema 格式:
1 |
|
1 |
|