深入理解kafka-核心技术与实战篇【干的要命系列】(一)
1. 开篇
1.1 前言
看8股文学习kafka,对其的理解都是别人的一些总结,都是些嚼碎的散状知识(主要之前面试一问这里深点,就跟个菜鸡😅)。为了让有个立体全面的深入学习,so计划写两篇关于kafka的博客,第一篇是聚焦kafka核心的技术与实战,第二篇是通过源码来加深学习kafka,此为第一篇,这篇更加偏向技术实战,此篇预计分两个篇幅完成(内容比较干)。
1.2 为什么要学习kafka?
作为工程师或者架构师,在实际的工作中难免遇见大数据业务的建设,由于这些系统都是为公司业务服务的,所以他们仅仅只是执行一些常规的业务逻辑,因此他们不能算是计算密集型,相反更多是数据密集型。对于数据密集型应用来说,如何对应数据的激增、数据的复杂度以及数据高速的变化,是对工程师架构师最有效的证明。就拿数据激增来说,Kafka能有效的隔离上下游业务,将上游的流量激增平滑的传递到子系统中。如果是一名大数据从业人员,熟练掌握Kafka是非常有必要的技能。
同时Kafka有着广阔的应用场景。目前Apache Kafka被认为是消息引擎领域的引导者,技术学习角度而言,Kafka有很多亮点,我们只需要学习一套框架就能在实际业务系统中实现消息引擎、应用程序集成、分布式存储构建、甚至是流处理应用的开发与部署,简直物超所值。
1.3 如何学习
第一步先弄清楚客户端,目前有两大客户端:Java客户端和libkafka客户端,然后去官网学习一下代码演示,如何正常的编译和运行这些样例。
第二步修改对应的样例,理解使用其他API,对这些修改进行观测,做一个小项目来验证学习成果。这个阶段就可以熟读一遍Kafka官方文档,重点理解各种参数。
最后学习Kafka的高级功能,比如流处理,还能执行高级的流式处理操作,比如时间窗口聚合、流处理链接等。
如果是相关运维,相应的学习目标应该是学习搭建及管理 Kafka 线上环境,如何进行监控数据等。
这里用脑图来总结一下学习线路:
2. Kafka入门
2.1 常见术语
- 消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
- 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
- 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
- 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
- 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
- 生产者:Producer。向主题发布新消息的应用程序。
- 消费者:Consumer。从主题订阅新消息的应用程序。
- 消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
- 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
- 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
2.2 Kafka的种类
- Apache Kafka: 最正宗的Kafka,他是后续所有发行版的基础。
- Confluent Kafka: 主要为商业开发工具,拥有原本没有的高级特性,比如跨数据中心备份、Schema注册中心以及集群监控工具等。
- Cloudera/Hortonworks Kafka: CDH和Hortonworks提供HDP是非常著名的大数据平台,里边集成了目前主流的大数据框架,能够帮助用户实现分布式存储、集群调度、流处理到机器学习、实时数据库等全方面的数据处理。
3. Kafka基本使用
3.1 Kafka线上集群部署方式
下面从几个方面:操作系统、磁盘、磁盘容量、带宽来讨论
3.1.2 操作系统
目前常见的操作系统有 3 种:Linux、Windows 和 macOS。部署在 Linux 上的生产环境是最多的,一般从下面三个方面考虑
- I/O 模型的使用
- 数据网络的传输效率
- 社区支持度
主流的 I/O 模型通常有 5 种类型:阻塞式 I/O、非阻塞式 I/O、I/O 多路复用、信号驱动 I/O 和异步 I/O。每种 I/O 模型都有各自典型的使用场景,比如 Java 中 Socket 对象的阻塞模式和非阻塞模式就对应于前两种模型;而 Linux 中的系统调用 select 函数就属于 I/O 多路复用模型;大名鼎鼎的 epoll 系统调用则介于第三种和第四种模型之间;至于第五种模型,其实很少有 Linux 系统支持,反而是 Windows 系统提供了一个叫 IOCP 线程模型属于这一种。
熟悉 Linux肯定听过零拷贝(Zero Copy)技术,就是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输。Linux 平台实现了这样的零拷贝机制。
3.1.3 磁盘及容量
追求性价比可以不搭建 RAID,使用普通磁盘组成存储空间即可。
使用机械磁盘完全能够胜任 Kafka 线上环境。
那么容量相关的,是需要计算规划的,比如每天一亿条1KB大小消息,保存两份且存两周,那么总共就需要200GB的内存。Kafka还需要索引数据大概10%,保存两周那么就是200GB * 1.1 * 14 那就大概需要3TB大小。Kafka还支持数据压缩,压缩比是0.75,那么这就是最后在2.25-3TB之间。
总之在规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
3.1.4 带宽
带宽需要根据Kafka服务器的数量、业务量处理目标来定,比如需要在一个小时处理1TB的数据,同时需要3台机器,根据实际经验kafka用到70%以上的带宽就会丢包,那么单台的使用带宽就是 700Mb / 3 约等于 240Mbps。
3.2 kafka的配置重要参数
静态参数,是指你必须在 Kafka 的配置文件 server.properties 中进行设置的参数,不管你是新增、修改还是删除。同时,你必须重启 Broker 进程才能令它们生效。而主题级别参数的设置则有所不同,Kafka 提供了专门的 kafka-configs 命令来修改它们。至于 JVM 和操作系统级别参数,它们的设置方法比较通用化,我介绍的也都是标准的配置参数,因此,你应该很容易就能够对它们进行设置
Boker的重要参数(静态参数):
- log.dirs : 指定Broker使用的文件目录路径
- log.dir : 补充log.dirs的单个路径
- zookeeper.connect : ZooKeeper相关设置,保存Kafka集群的元数据信息
- 与Broker连接相关的设置
- listeners:学名叫监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
- advertised.listeners:这组监听器是 Broker 用于对外发布的
- host.name/port:域名和端口
- Topic管理参数
- auto.create.topics.enable: 是否允许自动创建 Topic
- unclean.leader.election.enable:是否允许 Unclean Leader 选举
- auto.leader.rebalance.enable:是否允许定期进行 Leader 选举
- 数据留存参数
- log.retention.{hours|minutes|ms}: 都是控制一条消息数据被保存多长时间
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小
- message.max.bytes:控制 Broker 能够接收的最大消息大小
Topic的重要参数:
- Topic级别参数
- retention.ms: 规定了该 Topic 消息被保存的时长。默认是 7 天
- retention.bytes: 规定了要为该 Topic 预留多大的磁盘空间
- max.message.bytes: 决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小
JVM参数:
- JAVA版本
- 堆大小
- 垃圾回收器选择
- 设置方法:KAFKA_HEAP_OPS 、KAFKA_JVM_PERFORMANCE_OPTS
操作系统参数
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
4. 客户端实践及原理剖析
4.1 生产者分区概念
4.1.1 何为分区?
A:Kafka的结构是:主题 - 分区 - 消息,主题下的每条消息都会在某一个分区里,如下图
其实分区的作用是提供负载均衡的能力,为了实现系统的高伸缩性(Scalability)。这样就可以把读写都在各自的分区里完成,提高系统的吞吐量。不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region。但是换汤不换药,整体思想是不变的。
4.1.2 分区策略
Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:
1 |
|
分区策略有以下几种:
- 轮训
- 随机
- 按照消息key保序
4.2 Kafka的生产者压缩及算法
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
1 |
|
其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。
情况一:Broker 端指定了和 Producer 端不同的压缩算法。
情况二:Broker 端发生了消息格式转换。
最好的方式是:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
不论哪个版本,Kafka的消息层次分为两次:消息集合(message set),消息(message)。一个消息集合包含若干日志项,日志项才是真正封装消息的地方。Kafka不会直接操作一条条的消息,而是在消息集合这个层面进行写入操作。
Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd),下边是各个压缩算法的优劣。
在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。
4.3 怎么保证Kafka消息不丢失
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。确保消息消费完成再提交。
- Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
4.4 高级功能
Kafka可以自定义拦截器,支持为生产者和消费者,支持链式拦截,设置方法通过参数进行完成。在一些客户端监控,端到端系统性能检测、以及审计日志上都有典型的应用场景。
1 |
|
4.5 生产者如何管理TCP链接的
Apache Kafka 所有的通讯都是基于TCP的,在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接,TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
4.6 幂等生产者和事务生产者
Kafka的消息交付可靠性保障的三种承诺:最多一次、至少一次和精确一次。
Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)
4.6.1 幂等性 Producer
在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
4.6.2 事务
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
相关Producer代码中做一些调整:
1 |
|
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
4.7 Kafka的消费者组
Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。既然是组,必然可以有多个消费者或者消费者实例,他们共享一个公共组ID。组内的所有消费者协调在一起来消费订阅主题的所有分区
- Consumer Group 下可以有一个或多个 Consumer 实例。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。
Consumer Group 端大名鼎鼎的重平衡,也就是所谓的 Rebalance 过程,Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
4.8 位移主题(__consumer_offsets)
位移消息都放在位移主题(__consumer_offset topic)中,已经取代zk。 一共有三种消息类型:
1、__consumer_offset 记录消费组下某个消费者在某个topic的partition的位移信息(是一种key value的一种格式,key由三部分组成 groupid+topic+pattition)。
2、consumer group信息(在新建group会创建该消息)。
3、tombstone(墓碑消息),移除group consumer所有的信息。 【注意】:第三种消息会在group下的所有consumer下线 且 group偏移信息全部移除才会发出墓碑消息。
Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀
4.9 Rebalance
Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。
4.10 位移提交
位移提交分自动提交和手动提交
开启自动提交的方法很简单。Consumer端有个参数enable.auto.commit,把它设置成true或者默认就是自动提交。还有一个参数:auto.commit.interval.ms,表示Kafka多少秒后为你自动提交。
1 |
|
手动提交要调用对应的API手动提交,最简单的是KafkaConsumer#commitSync(),该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回
1 |
|
一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。
刚才说的API是同步操作,会导致客户端的阻塞,鉴于这个问题可以使用异步API:KafkaConsumer#commitAsync()
1 |
|
一般会结合同步异步一起来完成位移提交的处理。
批处理的解决方式:
1 |
|
4.11 CommitFailedException
Kafka Consumer端的CommitFailedException异常处理方法 Kafka Consumer端的CommitFailedException异常是指在提交位移时出现错误或异常,通常由于消费者实例连续两次调用poll方法的时间间隔超过了预设值而导致。
异常的处理方法包括优化消息处理逻辑、调整参数值,或者使用多线程加速消费。具体建议包括缩短单条消息处理时间、增加允许下游系统消费一批消息的最大时长、减少一次性消费的消息总数以及使用多线程来加速消费。
需要注意消费者组和独立消费者在使用前都要指定group.id。如果出现设置相同group.id值的消费者组程序和独立消费者程序,可能会导致Kafka抛出CommitFailedException异常。
4.12 多线程开发消费
谈到 Java Consumer API,最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
4.12.1 方案
a. 方案一:对线程 + 多KafkaConsumer实例
1 |
|
b.方案二:单线程+单KafkaCOnsumenr+消息处理WOrker线程池
1 |
|
4.13 Java 消费者是如何管理TCP连接
TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
发起 FindCoordinator 请求时。
连接协调者时。
消费数据时。
消费者程序会创建 3 类 TCP 连接:
- 确定协调者和获取集群元数据。
- 连接协调者,令其执行组成员管理操作。
- 执行实际的消息获取。
4.14 消费者组消费进度监控
对于Kafka消费来说,最重要的事情就是监控他们的消费进度,或者说是监控他们消费的滞后程度。这个滞后程度有个专门的名称:消费Lag或者Consumner Lag。如果滞后太多,数据可能就不在系统的页缓存中,从磁盘中获取数据会导致性能差,更加加大了滞后性(类似于马太效应)。
消费者进度如此重要,那么我们应该怎么监控呢?
- 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
- 使用 Kafka Java Consumer API 编程。
- 使用 Kafka 自带的 JMX 监控指标。
未完待续……