Kafka源码学习(一)-构建Kafka工程和源码阅读环境

1. 前置说明

看技术文章 -> 思考 -> 输出为blog,这是一个有效的正向循环,而看代码 -> 思考 -> 实操也是如此。这能更加深入理解,并且比单看有趣的多。所以源码学习是必不可少的,这是是Kafka 学习的姊妹篇,深入理解kafka-核心技术与实战篇【干的要命系列】 的后续源码学习篇。

2. 环境搭建

在开始之前需要做一些准备工作,软件的安装,比如:Java 环境、Gradle、Scala、IDE、Git 等等。

  1. Java 版本:目前最新的需要 17 或者 23
  2. Gradle:使用gradlew 命令来具体安装
  3. Scala 需要 2.13 是唯一支持的版本
  4. IED 的话就用 IDEA、Git 的话也不用说了

3. 构建Kafka 工程

使用命令行从 github 上拉取分支,默认Trunk:

1
$ git clone https://github.com/apache/kafka.git

然后执行命令, MAC OS 上:

1
$ ./gradlew build

下面,用张图展示下 Kafka 工程的各个目录以及文件:

这里简单介绍一下主要的组件目录:

  • bin 目录:保存 Kafka 工具行脚本,我们熟知的 kafka-server-start 和 kafka-console-producer 等脚本都存放在这里。

  • clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。

  • config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。

  • connect 目录:保存 Connect 组件的源代码。Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。

  • core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。

  • streams 目录:保存 Streams 组件的源代码。Kafka Streams 是实现 Kafka 实时流处理的组件。

Kafka 可以支持单元测试:

1
2
3
4
$ ./gradlew core:test
$ ./gradlew clients:test
$ ./gradlew connect:[submodule]:test
$ ./gradlew streams:test

我们主要聚焦Kafka Broker 端源代码,因此Kafka core 包的内容需要重点关注一下:

  • controller 包:保存了 Kafka 控制器(Controller)代码,而控制器组件是 Kafka 的核心组件,后面我们会针对这个包的代码进行详细分析。

  • coordinator 包:保存了消费者端的 GroupCoordinator 代码和用于事务的 TransactionCoordinator 代码。对 coordinator 包进行分析,特别是对消费者端的 GroupCoordinator 代码进行分析,是弄明白 Broker 端协调者组件设计原理的关键。

  • log 包:保存了 Kafka 最核心的日志结构代码,包括日志、日志段、索引文件等,后续会有详细介绍。另外,该包下还封装了 Log Compaction 的实现机制,是非常重要的源码包。

  • network 包:封装了 Kafka 服务器端网络层的代码,特别是 SocketServer.scala 这个文件,是 Kafka 实现 Reactor 模式的具体操作类。

  • server 包:顾名思义,它是 Kafka 的服务器端主代码,里面的类非常多,很多关键的 Kafka 组件都存放在这里,比如状态机、Purgatory 延时机制等。

目前 kafka 是 java 和 scala 混合开发的,很多核心代码都是 Scala,需要有 Scala 的语言基础。

4.Scala 基础

4.1 定义变量和函数

Scala 有两类变量:val 和 var。

val 等同于 Java 中的 final 变量,一旦被初始化,就不能再被重新赋值了。相反地,var 是非 final 变量,可以重复被赋值。

1
2
3
4
5
6
7
8
9
10
11
12
scala> val msg = "hello, world"
msg: String = hello, world

scala> msg = "another string"
<console>:12: error: reassignment to val
msg = "another string"

scala> var a:Long = 1L
a: Long = 1

scala> a = 2
a: Long = 2

函数定义:

1
2
3
4
def max(x: Int, y: Int): Int = {
if (x > y) x
else y
}

def 关键字表示这是一个函数。max 是函数名,括号中的 x 和 y 是函数输入参数,它们都是 Int 类型的值。结尾的“Int =”组合表示 max 函数返回一个整数。

在 Scala 中,函数体具体代码块最后一行的值将被作为函数结果返回。

4.2 定义元组(Tuple)

元组是承载数据的容器,一旦被创建,就不能再被更改了。元组中的数据可以是不同数据类型的。定义和访问元组的方法很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val a = (1, 2.3, "hello", List(1,2,3)) // 定义一个由4个元素构成的元组,每个元素允许是不同的类型
a: (Int, Double, String, List[Int]) = (1,2.3,hello,List(1, 2, 3))

scala> a._1 // 访问元组的第一个元素
res0: Int = 1

scala> a._2 // 访问元组的第二个元素
res1: Double = 2.3

scala> a._3 // 访问元组的第三个元素
res2: String = hello

scala> a._4 // 访问元组的第四个元素
res3: List[Int] = List(1, 2, 3)

4.3 循环写法

我们常见的循环有两种写法:命令式编程方式和函数式编程方式。熟悉的是第一种,比如下面的 for 循环代码:

1
2
3
4
5
6
7
8
9
scala> val list = List(1, 2, 3, 4, 5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> for (element <- list) println(element)
1
2
3
4
5

第二种写法,会让代码写得异常简洁:

1
2
// dataPlaneAcceptors:ConcurrentHashMap<Endpoint, Acceptor>对象
dataPlaneAcceptors.asScala.values.foreach(_.initiateShutdown())

这一行代码首先调用 asScala 方法,将 Java 的 ConcurrentHashMap 转换成 Scala 语言中的 concurrent.Map 对象;然后获取它保存的所有 Acceptor 线程,通过 foreach 循环,调用每个 Acceptor 对象的 initiateShutdown 方法。

4.4 case 类

在 Scala 中,case 类与普通类是类似的,只是它具有一些非常重要的不同点。Case 类非常适合用来表示不可变数据。同时,它最有用的一个特点是,case 类自动地为所有类字段定义 Getter 方法,这样能省去很多样本代码

要编写一个类表示平面上的一个点,Java 代码大概长这个样子:

1
2
3
4
5
6
7
8
9
10
public final class Point {
private int x;
private int y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
// setter methods......
// getter methods......
}

但如果用 Scala 的 case 类,只需要写一行代码就可以了:

1
2
case class Point(x:Int, y: Int) // 默认写法。不能修改x和y
case class Point(var x: Int, var y: Int) // 支持修改x和y

4.5 模式匹配

和 Java 中 switch 仅仅只能比较数值和字符串相比,Scala 中的 match 要强大得多:

1
2
3
4
5
6
7
8
9
def describe(x: Any) = x match {
case 1 => "one"
case false => "False"
case "hi" => "hello, world!"
case Nil => "the empty list"
case e: IOException => "this is an IOException"
case s: String if s.length > 10 => "a long string"
case _ => "something else"
}

4.6 Option 对象

Option 表示一个容器对象,里面可能装了值,也可能没有装任何值。由于是容器,因此一般都是这样的写法:Option[Any]。中括号里面的 Any 就是上面说到的 Any 类型,它能是任何类型。如果值存在的话,就可以使用 Some(x) 来获取值或给值赋值,否则就使用 None 来表示:

1
2
3
4
5
6
7
8
scala> val keywords = Map("scala" -> "option", "java" -> "optional") // 创建一个Map对象
keywords: scala.collection.immutable.Map[String,String] = Map(scala -> option, java -> optional)

scala> keywords.get("java") // 获取key值为java的value值。由于值存在故返回Some(optional)
res24: Option[String] = Some(optional)

scala> keywords.get("C") // 获取key值为C的value值。由于不存在故返回None
res23: Option[String] = None

Option 对象还经常与模式匹配语法一起使用,以实现不同情况下的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
def display(game: Option[String]) = game match {
case Some(s) => s
case None => "unknown"
}

scala> display(Some("Heroes 3"))
res26: String = Heroes 3

scala> display(Some("StarCraft"))
res27: String = StarCraft

scala> display(None)
res28: String = unknown

Kafka源码学习(一)-构建Kafka工程和源码阅读环境
https://liu620.github.io/2025/07/06/Kafka源码学习(一)-构建Kafka工程和源码阅读环境/
作者
alen
发布于
2025年7月6日
许可协议