Kafka-基本概念

Kafka简介

Kafka是用scala语言编写,最初由Linkedin公司开发,后贡献给了Apache基金会并成为顶级开源项目。是一个分布式、支持分区的(partition)、多副本的(replication),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等。

Kafka是一个类JMS消息队列,结合了JMS中的两种模式,可以有多个消费者主动拉取数据。虽然它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现,在JMS中只有点对点模式才有消费者主动拉取数据。

1.Kafka的特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  • 可扩展性:kafka集群支持热扩展。
  • 持久性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 容错性:支持数据备份,防止了某台服务器宕机后数据丢失。
  • 高并发:支持数千个客户端同时读写。
  • 顺序性:可以通过分区算法,保证消息的一定的顺序性。

2.Kafka的使用场景

Kafka可以应用于非常多的场景,比如:

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等。
  • 消息系统:解耦生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如 Spark Streaming 和 Storm。

Kafka的几个重要概念

官方文档:Kafka官网文档
Kafka的运行架构如下图,各组件之间通过TCP协议通信:

1.Broker

kafka 集群由多个 kafka 实例组成,每个实例称为 broker,在集群中每个 broker 都有一个唯一 brokerid
无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性,为集群保存一些 metadata (元数据)信息。

2.Topic And Partition

主题(topic)是对 Kafka 消息的分类,或者说是一系列消息的名义上的名字。Kafka 的 topic 是支持多用户订阅的,就是说,一个 topic 可以有零个、一个或多个消费者订阅写入的数据。

每一个 topic 的所有消息都会被保存到一个或多个分区(Partition),比如有3个分区,则这个 topic 下的消息就会分成3份来存储。对于一条消息它会被分到哪个分区上,默认是随机选择的,但更多的是根据一些复杂的语义来做分区算法,比如基于消息记录中的键值,我们可以计算出键值的 hashcode,然后对总分区数进行求模运算:partitionId = Math.abs(key.hashCode) % partitionCount。

我们还可以设置一个 topic 的 partition 的副本数,配置项:replication-factor。每个副本都会保存一份日志文件,这样就让 kafka 具有了容错能力。每个 partition 都有一个“leader”,零个或若干个“followers”,leader负责处理消息的读和写,与此同时,follower会被动的去复制leader上的数据。
如果leader发生故障,则会从 followers 中选举一个成为新的leader。每台服务器可以作为一些分区的leader,同时也可以作为其他分区的follower,这样集群就会据有较好的负载均衡。注意,和mysql中主从有区别,mysql做主从是为了读写分离,在kafka中读写操作都是leader

Log

Kafka 是通过日志(Log)文件的方式来持久化数据的。对于每一个 topic,它的每一个 partition 都会有一个日志文件。如下图:

每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号来作为在这个分区中的唯一标识。

Kafka可以配置这些消息保留的时间,在这个时间段内不管有没有消费,数据都不会丢失。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的,之后它将被丢弃以释放空间。

Kafka的性能是和数据量无关的常量级,所以保留太多的数据并不是问题(只要有足够的存储空间)。

Producer

Producer是指发布消息至 Kafka 的客户端。

Consumer

Consumer是指去消费 Kafka 消息的客户端。每个 Consumer 进程都需要指定一个 Consumer Group(配置项:group.id),Consumer Group 相当于是对 Consumer 的分组。同一个 Consumer Group 中的 Consumer 可以在不同的程序中,也可以在不同的机器上。
一个 Consumer Group 可以订阅一个或多个 topic,一个 topic 也可以被多个 Consumer Group 所订阅,但是某个 topic 中的一个条消息只会被 Consumer Group 中的一个 Consumer 消费。

上图显示,一个 kafka 集群中的某个 topic 有4个分区(P0-P3)和2个 consumer 组。A组有2个 consumer,B组有4个 consumer。

其实上面所说的订阅关系还不够明确,其实 topic 中的 partition 被分配到某个consumer上,也就是某个 consumer 订阅了某个 partition。Consumer Group 订阅的是 topic,但是 consumer 订阅的是 partition

offset

一个 consumer 在消费一个 partition 上的数据时,其实就是在顺序地读取 partition 上日志文件。读取到了哪一条消息,kafka 中用 offset 来标识。consumer 消费消息之后,就会向集群提交一个 offset。一般情况下随着 consumer 不断的读取消息,这 offset 的值不断线性增加,但其实 consumer 可以在任何它想要的位置读取记录,比如它可以将offset设置成为一个旧的值来重读之前的消息。

上图中,consumer A、B分属于不用的Consumer Group。consumer B读取到offset =11,consumer A读取到offset=9 。这个值表示Consumer Group中的某个consumer 在下次读取该 partition 时会从哪个 offset 的 message 开始读取,即 Consumer Group A 中的 consumer 下次会从 offset = 9 的 message 读取, Consumer Group B 中的 consumer 下次会从 offset = 11 的 message 读取。

这里并没有说是 Consumer A 下次会从 offset = 9 的 message 读取,原因是 consumer A 可能会退出 Group ,然后进行 rebalance,即重新分配分区。rebalance 的概念下面会介绍。

这个 offset 的数据在 kakfa 0.9 版本之前是保存在 Zookeeper 上,但因为 Zookeeper 并不适合频繁写数据,所以在新版本中 offset 的值是保存的 kafka 内置的一个 topic 里,topic 的名称为 __consumer_offsets。

Consumer 与 Consumer Group 的动态维护

当一个 consumer 进程挂掉时,该 consumer 所订阅的 partition 会被重新分配到该 group 内的其它的 consumer 上。当一个 consumer 加入到一个Consumer Group中时,同样会从其它的 consumer 中分配出一个或者多个 partition 给这个新加入的 consumer。这个过程,被称为 Rebalance。

这个动态维护是通过 Coordinator 来管理的,consumer 会周期性的发送 heartbeat 到 coordinator,当时间超过session.timeout.ms而 coordinator 没有收到某个 consumer 的 heartbeat,则该 consumer 会被认为已宕机,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。

如果一个 consumer 进程一直在周期性的发送 heartbeat,但是它没有消费消息,这种状态称为livelock状态。

Coordinator

Coordinator (协调者)是 kafka 中的一个重要组件。它专门为 Consumer Group 服务,负责Group Rebalance以及提供 offset 管理和 Consumer Group 的成员管理等。在早期版本,以 zookeeper 作为协调者,后期版本则以某个 broker 作为协调者。

consumer 端在提交 offset 时,其实是向 coordinator 所在的 Broker 提交 offset。同样地,当 consumer 启动时,也是向 coordinator 所在的 broker 发送各种请求,然后由 coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

如何确定Consumer Group的Coordinator

consumer group 如何确定自己的 coordinator 是谁呢? 简单来说分为两步:

  1. 确定consumer group的 offset 信息写入__consumers_offsets这个 topic 的哪个分区。具体计算公式:

    __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount
    

    注意:groupMetadataTopicPartitionCount 由offsets.topic.num.partitions指定,默认是50个分区。

  2. 该分区 leader 所在的 broker 就是被选定的 coordinator。

Controller

控制器(Controller)是 Kafka 中另一个核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群,例如管理 topic 的各个 partition分配到哪几个节点,选举某个 partition 的 leader 等工作。

集群中任意一台 Broker 都能充当 controller 的角色,但是,在运行过程中,只能有一个 Broker 成为 controller,行使其管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个 controller。

Controller的作用

  1. topic管理(创建、删除、增加分区)
    这里的 topic 管理,就是指 controller 帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。换句话说,当我们执行 kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
  2. 分区重分配
    分区重分配主要是指,kafka-reassign-partitions 脚本,提供的对已有 topic 分区进行细粒度的分配功能。这部分功能也是 controller 实现的。比如某个 kafka 的节点比较慢,就可以通过这个脚本把那个节点上 partition 转移到其它节点上去。
  3. preferred replica election
    在创建一个 topic 时,kafka尽量将 partition 均分在所有的 brokers 上,并且将 replicas 也均分在不同的broker上,默认使用 replicas 队列里的第一个副本作为 leader。
    但随着时间推移,broker可能会停机,会导致 leader 迁移,导致机群的负载不均衡,即不再是第一个副本作为 leader。所以我们会期望对topic的leader进行重新负载均衡。
    preferred replica election当前有两种触发方式:自动触发(auto.leader.rebalance.enable=true)和kafka-preferred-replica-election脚本触发。两者“玩法”相同,向 Zookeeper 的/admin/preferred_replica_election写数据,controller 提取数据执行 leader 分配。
  4. 集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
    包括自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于 Zookeeper 的 Watch 功能和临时节点组合实现的。
    比如,controller 组件会利用 Watch 机制检查 ZooKeeper 的/brokers/ids节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在/brokers/ids下创建专属的 znode 临时节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给 controller,这样,controller就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
  5. 数据服务
    就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

Kafka的特点

1.Kafka作为消息系统

如何将Kafka的流的概念和传统的企业信息系统作比较?

消息处理模型历来有两种:队列和发布-订阅。在队列模型中,一组消费者可以从服务器读取记录,每个记录都会被其中一个消费者处理; 在发布-订阅模式里,记录被广播到所有的消费者。这两种模式都具有一定的优点和弱点。队列的优点是它可以让你把数据分配到多个消费者去处理,它可以让您扩展你的处理能力。不幸的是,队列不支持多个订阅者,一旦一个进程读取了数据,这个数据就会消失。发布-订阅模式可以让你广播数据到多个进程,但是因为每一个消息发送到每个订阅者,没办法对订阅者处理能力进行扩展。

Kafka的消费群的推广了这两个概念。消费群可以像队列一样让消息被一组进程处理(消费群的成员),与发布 – 订阅模式一样,Kafka可以让你发送广播消息到多个消费群。

Kafka的模型的优点是,每个主题都具有这两个属性,它可以扩展处理能力,也可以实现多个订阅者,没有必要二选一。

Kafka比传统的消息系统具有更强的消息顺序保证的能力。

传统的消息队列的消息在队列中是有序的,多个消费者从队列中消费消息,服务器按照存储的顺序派发消息。然而,尽管服务器是按照顺序派发消息,但是这些消息记录被异步传递给消费者,消费者接收到的消息也许已经是乱序的了。这实际上意味着消息的排序在并行消费中都将丢失。消息系统通常靠 “排他性消费”( exclusive consumer)来解决这个问题,只允许一个进程从队列中消费,当然,这意味着没有并行处理的能力。

Kafka做的更好。通过一个概念:并行性-分区-主题实现主题内的并行处理,Kafka是能够通过一组消费者的进程同时提供排序保证和负载均衡。每个主题的分区指定给每个消费群中的一个消费者,使每个分区只由该组中的一个消费者所消费。通过这样做,我们确保消费者是一个分区唯一的读者,从而顺序的消费数据。因为有许多的分区,所以负载还能够均衡的分配到很多的消费者实例上去。但是请注意,一个消费群的消费者实例不能比分区数量多。

2.Kafka作为存储系统

任何消息队列都能够解耦消息的生产和消费,还能够有效地存储正在传送的消息。Kafka与众不同的是,它是一个非常好的存储系统。

Kafka把消息数据写到磁盘和备份分区。Kafka允许生产者等待返回确认,直到副本复制和持久化全部完成才认为成功,否则则认为写入服务器失败。

Kafka使用的磁盘结构很好扩展,Kafka将执行相同的策略不管你是有50 KB或50TB的持久化数据。

由于存储的重要性,并允许客户控制自己的读取位置,你可以把Kafka认为是一种特殊用途的分布式文件系统,致力于高性能,低延迟的有保障的日志存储,能够备份和自我复制。

Kafka如何保证消息的有序性

传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。

Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,这样一个 Consumer Group 消费时也是顺序的。

------ 本文完 ------