「中间件」kafka学习笔记(一)

kafka概述以及kafka架构解析

Posted by odin on April 9, 2020

1.什么是kafka

kafka是一种分布式的基于发布/订阅模式的消息中间件,主要应用于大数据实时分析领域。Kafka是最初由Linkedin公司开发,由scala和java编写,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 主要应用场景:日志收集系统和消息系统。

2.kafka主要设计特性

  1. 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  2. “零”复制机制。减少用户太的数据复制,只保留内核态的数据复制,从而达到快速IO。
  3. topic分区
  4. 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  5. 同时支持离线数据处理和实时数据处理。
  6. 支持分区扩展。topic分区(partition)可以增加,但是不能减少。
  7. 局部有序性。在一个分区内是有序的。

3.kafka总体架构

kafka总体架构图-来自网络图片

从总体架构上也能看出,kafka是基于zookeeper的,依赖zookeeper存储offset(偏移量,在kafka 0.9版本以前,0.9版本之后offset存储在kafka自身默认的consemuer topic中),以及依赖zookeeper进行controller抢占注册。

kafka controller,作为kafka的控制器作用后续会讲,先提一下就是全局唯一,负责管理整个集群中所有分区和副本的状态,例如为分区xuanquu新的leader。抢占注册机制可以理解成抢锁机制,先到先得。

kafka总体架构分为:

  • produce:生产者。
  • broker:kafka集群中的服务节点。
  • consumer:消费者/订阅者,consumer有组的概念,consumergroup。一个consumergroup组内多个consumer,kafka通过这种方式提高吞吐量。
  • topic:消息主题。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
  • partition:topic分区,是物理上的概念,每个topic包含一个或者多个partition。一个partition只能被一个consumer消费,若是对应consumergroup,那么也是只能对应xonsumergroup中的一个consumer,同一个partition不能被多个consumer消费。kafka通过在每个broker上分配多个partition实现可扩展性。

    为什么不能减少分区?
    kafka的分区减少会面临partition和consumer的重新划分,又要保证partition中保留的数据不丢失和重复,实现过程很复杂,所以官方没有实现这个规则策略,规定只能增加不能减少。

  • replica:副本,保证kafka集群中某个节点发生故障,该节点上的数据不丢失,kafka提供副本机制。topic的每个分区都有一个leader, 若干个follower,和replica。kafka通过在不同broker上分配多个replica实现高可用性。

基于partition只能对应一个consumer的特性,在kafka集群架构设计上我们需要注意的是,consumer的节点不要多于partition,否则多出来的consumer不会进行消费。若partiton多于consumer,kafka会通过分区策略(roundrobin和xrange)进行规划,后续详细讲解。

4.kafka创建tpoic流程

没有深入了解,这里简单概述一下kafka创建topic流程,详细过程建议阅读kafka源码或者google。

kafka 0.9之前的版本和0.9之后的版本有区别,这里以0.9之后的版本为例说明 kafka创建topic流程:

  1. 首先在zookeeper上创建相应topic名称的节点。
  2. kafka组件controller监控到zookeeper上节点变化,并创建topic缓存在kafka中。

5.kafka集群节点分布

kafka集群中维护着相关的状态信息,不同的状态信息具有不同的作用。包含同步选举使用的节点信息和副本所在节点信息。可以通过客户端--describe --topic name命令指定topic查看该topic的集群节点信息查看。 查看结果看出,名为first的topic有3个partition,以partition:0来说,
replicas:1,2,代表一个副本在节点1上另外一个副本在节点2上,那么必然其中一个是leader,另外一个是follower。
leader:1说明leader是broker=1。

在kafka总体架构图中也可以看到,topicA在每个broker上都有2个分区,有两个副本分别落在broker1和broker2上。其中partition:0的leader落在broker1上,follower落在broker2上。由于replica是在不同broker节点上,可以保证当一个broker节点挂掉后,有另外一个replica可以正常工作。

最后有一个描述Isr,这个是一个重要的内容。记录的是与leader保持同步的follower的集合。作为kafka重新选举的候选节点池,当leader挂掉后kafka会从Isr集合中的节点选举新的leader。

当follower节点与leader节点断开连接超时,follower节点会从Isr列表中移除,follower节点恢复并从leader节点同步完数据后重新加入Isr集合列表中。

6.kafka工作流程概述

大致概括一下kafka的工作流程,忽略各个模块内部的细节处理流程机制。

  1. porduce向topic发送消息数据。数据发送模式是批量发送,目的是减少建立连接,这是kafka高效的原因之一。当然这个批量发送的大小是可以通过produce的配置调整。
  2. leader接收到消息数据并写入partition,partition在物理存储上是连续追加的,这也是kafka高效的原因之一。
  3. leader向replica同步接收到的消息数据。
  4. consumer消费数据。(消费数据时kafka省略了用户层的copy过程,这也是kafka高效的原因之一。)

7.kafka文件存储

7.1)关于offset

既然kafka是文件形式顺序追加写入,那么如何知道当前consumer消费到什么位置呢? 首先明白kafka中的三个概念:offset,HW (high watermark),LEO 如上图所示,假设向topic中写入数据,partition0为leader。那么首先写入的是partition。offset是写入的偏移量。 leader写入完成后需要向follower同步数据,由于网络影响多个follower同步数据一定是有快有慢的,会造成follower之间的同步的数据量不统一。每个replica都会有一个自己存储的数据最大偏移量,这个就是LEO。 既然LEO存在差异,那么consumner消费时为了保持数据一致性,必然不能消费到LEO,只能消费到所有节点数据同步最少的那个位置(木桶漏水原理),这个位置的偏移量就是HW。

每个分区都要自己存储当前消费到的offset位置,存储在kafka内置的topic中__consumer_offset

0.9版本之前的kafka将当前消费到的offset存储在zookeeper对应的topic节点上,这个方式存在一个问题,当有很多topic消费时,会造成zookeeper过载问题,kafka在0.9版本之后改为将当前消费offset存储在本身内置的topic中。

7.2)kafka文件存储机制

向kafka写入消息,由于kafka topic拥有多个分区,分区小的消息又是追加写入的,那么存储的日志文件势必会很大,会造成很多问题,例如日志清理,重载日志文件,目标寻址。因为查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。kafka为了避免这些问题的文件存储机制采用了分段和索引两种方式结合:

  • segment 分段:kafka在partition下存储的日志文件根据大小分为多个segment,解决了单日志文件过大的问题。每个segment又包含.index.log两个日志文件。 segment文件名称以文件内第一条消息的offset命名,这样方便数据清理时可以通过offset直接删除整个文件而不用寻址。

  • .log log文件定义了严格的存储格式,以便快速查询消息。

  • .index 为文件创建索引:index文件是二进制存储的,里面的每条索引都记录了消息的相对offset和在文件中的物理位置。这里的相对offset和log文件里的绝对offset不同,相对offset是每个segment都从0开始的,而绝对offset在整个partition中都是唯一的。 索引文件中包含若干个索引条目,每个条目表示数据文件中一条message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。
  • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
  • position:表示该条message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的message了。

.index和.log文件的对应关系: segment的索引文件中存储着大量的元数据,数据文件中存储着大量消息,索引文件中的元数据指向对应数据文件中的message的物理偏移地址。以索引文件中的3,497为例,在数据文件中表示第3个message(在全局partition表示第368772个message),以及该消息的物理偏移地址为497。 Partition中的每条message由offset来表示它在这个partition中的偏移量,这个offset并不是该Message在partition中实际存储位置,而是逻辑上的一个值(如上面的3),但它却唯一确定了partition中的一条Message(可以认为offset是partition中Message的id)。

通过给定的offset定位查找log
首先通过offset确定segment,再在.index中找到对应在.log文件中的位置。

Kafka高效文件存储设计特点:

  1. Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  2. 通过索引信息可以快速定位message和确定response的最大大小。
  3. 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  4. 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

8.kafka分区策略

8.1)kafka使用分区的好处

方便水平扩展,当kafka节点不够用时可以设置partition的数量增加分区。当然partition是不能减少的。 原因之前讲过,这里简单提一下:减少分区那么分区中的消费数据需要重新分配,实现逻辑比较复杂,所以kafka官方没有提供减少的方案,规定只能增加不能减少。

提高并发,以partition为单位进行读写。

8.2)kafka producer 消息分区的原则
1
2
3
4
// 截取生产者发送消费消息的几种方法
public ProducerRecord(String topic, String value) {}
public ProducerRecord(String topic, String key, String value) {}
public ProducerRecord(String topic, Integer partition, String key, String value) {}

生产者发送消费消息时将被发送消息封装成ProducerRecord Object,kafka api提供了几种方法,可以指明topic、value、key、partition。

  • 生产者指定pratition时,按照指定的值作为pratition。
  • 生产者没有指定pratition但是有key时,将key进行hash运算,将得到的hash值与topic的partition数进行取余得到partition的值。
  • 生产者既没有指明partition也没有指定key时,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增。将这个值与topic可用的partition总数取余得到partition的值。(roundrobin算法)
8.3)kafka consumer 分区策略

kafka采用的是发布/订阅的消费方式不是点对点的消费方式,并且consumer有group的概念。consumergroup的分区策略有两种,roundrobin和range。

  • roundrobin:常规roundrobin算法,partition/consumer取余。

    kafka默认的consumer分区策略,需要注意的是以partition为单位分配分区,所以不会出现分配不均匀的情况。

  • ange:RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
    例如现在某个topic有7个分区,3个消费者cousumer_id分别为0,1,2。那么安装range分区策略consumer_id=0的节点将分配到partition0~2一共3个分区,consumer_id为1和2的节点将分别分配到partition3~4,5~6的两个节点。

    range的缺陷是:range是以topic为单位分配的,当有多个topic时,每个topic分区都将按照这样的策略分配,可想而知consumer_id=0的消费节点分配的分区数为3xn,造成分区不均匀。

参考文章

https://kfly.top/2020/01/07/it/kafka%E7%9F%A5%E8%AF%86%E6%A2%B3%E7%90%86/ https://blog.csdn.net/qq_35641192/article/details/80956244 https://matt33.com/2016/03/08/kafka-store/