Kafka-ZooKeeper在Kafka中的作用是什么

Kafka集群使用ZooKeeper主要用来保存包括broker、topic的一些元数据,此外还有controller、consumer的信息等。

broker

broker和topic的元数据都保存在Zookeeper的/brokers这个节点下,如下图所示:

左侧这棵树保存的是Kafka的Broker信息,/brokers/ids/[0…N],每个临时节点对应着一个在线的Broker,Broker启动后会创建一个临时节点,代表Broker已经加入集群可以提供服务了,节点名称就是BrokerID,节点内保存了包括Broker的地址、版本号、启动事件等等一些Broker的基本信息。如果Broker宕机或者与ZooKeeper集群失联了,这个临时节点也会随之消失。

使用zkClient.sh连接Zookeeper可以查看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[zk: localhost:2181(CONNECTED) 0] ls /brokers    
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.126.158:9092"],"jmx_port":-1,"host":"192.168.
126.158","timestamp":"1568845320351","port":9092,"version":4}
cZxid = 0x600000097
ctime = Thu Sep 19 06:22:00 CST 2019
mZxid = 0x600000097
mtime = Thu Sep 19 06:22:00 CST 2019
pZxid = 0x600000097
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x272020001
dataLength = 200
numChildren = 0

topic

上图中右侧部分的这棵树保存的就是 topic 和 partition 的信息。/brokers/topics/节点下面的每个子节点都对应一个 topic,节点的名称就是 topic 名称,这个节点上保存了 topic 的分区信息。

1
2
3
4
5
6
7
8
9
[zk: localhost:2181(CONNECTED) 1] get /brokers/topics/topic-demo
{"version":1,"partitions":{"0":[0,1],"1":[0,2]}}
cZxid = 0x1400099cb3
ctime = Tue May 21 21:38:32 CST 2019
mZxid = 0x1400099cb3
ctime = Tue Sep 21 09:38:32 CST 2019
pZxid = 0x1400099cfd
cversion = 1
dataVersion = 0

上面显示topic-demo这个topic有两个partition,第一个partition的编号为0,保存在brokerId为0和1的两个kafka节点,第二个partition的编号为1,保存在brokerId为0和2的两个kafka节点。

此外,Zookeeper的每个 topic 节点下面都包含一个固定的 partitions 节点,partitions节点的子节点就是主题下的所有分区,节点名称就是分区编号。
每个分区节点下面是一个名为state的持久节点,节点中保存着分区当前的leader和所有的ISR的BrokerID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
[__transaction_state, __consumer_offsets, topic-demo]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/topic-demo
[partitions]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics/topic-demo/partitions
[0, 1, 2, 3]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/topic-demo/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 4] get /brokers/topics/topic-demo/partitions/0/state
{"controller_epoch":27,"leader":1,"version":1,"leader_epoch":43,"isr":[0,1]}
cZxid = 0x10000004c
ctime = Tue May 21 21:38:32 CST 2019
mZxid = 0x60000013c
mtime = Thu Sep 19 06:26:12 CST 2019
pZxid = 0x10000004c
cversion = 0
dataVersion = 85
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 78
numChildren = 0

controller

在Zookeeper上,除了/brokers节点,还有两个和 Controller 相关的节点。分别是/controller/controller_epoch

/controller节点上保存的是 kafka 集群中 Controller 角色的brokerId,这是一个临时节点,当 Kafka 集群启动时,集群中的 broker 就会尝试去在Zookeeper新建这个节点,创建成功的 broker 将成为集群中的 Controller。当 Controller 对应的 broker 宕机之后,这个临时节点将消除,因为 Zookeeper 的 watch 机制,其它还存活的节点就又会去新建 /controller节点,创建成功的 broker 会成为新的 Contorller。

/controller_epoch节点是一个持久节点,用于记录 Controller 发生变更的次数,即记录当前的控制器是第几代控制器,初始值是1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。
每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。
如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。
由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[zk: localhost:2181(CONNECTED) 28] get /controller
{"version":1,"brokerid":0,"timestamp":"1640589912082"}
cZxid = 0x1a0c8d3b74
ctime = Mon Dec 27 15:25:12 CST 2021
mZxid = 0x1a0c8d3b74
mtime = Mon Dec 27 15:25:12 CST 2021
pZxid = 0x1a0c8d3b74
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x47b53e7c02b1b26
dataLength = 54
numChildren = 0
[zk: localhost:2181(CONNECTED) 29] get /controller_epoch
46
cZxid = 0x10000001b
ctime = Tue Oct 08 11:17:23 CST 2019
mZxid = 0x1a0c8d3b76
mtime = Mon Dec 27 15:25:12 CST 2021
pZxid = 0x10000001b
cversion = 0
dataVersion = 45
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
------ 本文完 ------