Kafka学习了一段时间了,把相关琐碎的笔记整理成一篇文章,大家一起学习趴( •̀ ω •́ )✧

Kafka简要背景

互联网行业ABC – AI人工智能,BigData大数据,Cloud云计算云平台
· kafka是一个消息引擎,消息引擎则是一组规范,用于在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传输
· kafka如何传递消息格式:纯二进制的字节序列传输协议:点对点模型,发布/订阅模型(kafka两种传输模型都支持)
· 为什么系统AB之间不直接发送消息,而要经过一个kafka消息引擎呢?
削峰填谷!缓冲上下游瞬时突发流量,使其更平滑;
松耦合,简化应用的开发,减少了系统间不必要的交互

相关术语

发布订阅的对象是消息,而主题(Topic)是承载消息的逻辑容器,生产者向主题发布消息,消费者订阅主题消息,生产者和消费者都称为客户端。
Kafka的服务器由Broker的服务进程构成,一个Kafka集群由多个Broker组成
· 备份机制(不丢失消息):Leader Replica(对外提供服务) 和 Follower Replica(异步消息拉取)。
· 伸缩性(单机无法容纳大量数据):分区,将数据分开存储。每个主题划分成多个分区。分区中可以配置多个副本,用位移(offset)来表示。
· 数据持久化:磁盘追加写消息日志(Log)。为了避免磁盘空间消耗殆尽,使用日志段(Log Segment)机制,将日志进一步细分为多个日志段,Kafka在后台定期检查老的日志段能否被删除,从而回收磁盘空间。
· 消费者组(提升消费者端的吞吐量): 多个消费者实例共同组成一个组来消费一组主题,该组主题中的每个分区都只会被组内的一个消费者实例消费。消费者消费到分区的消息位置称为消费者位移(consumer offset)
· 重平衡(高可用的重要手段): 消费者组中的消费者还能彼此协助。若某个消费者实例挂了,Kafka能自动检测并将Failed实例所负责的分区转移给其他或者的消费者。不过Rebalance也会引起一些消费者问题,许多Bug连社区都无力解决。
一图总结:注意观察分区与broker的关系
kafka结构.webp

思考
1,如何判断老的日志段能够被删除呢?
2,为什么Kafka不像MySQL那样允许follower副本对外提供读服务呢?(主写从读为了缓解leader压力从而负载均衡,但是Kafka的分区相对均匀地分散到各个broker上,也可以达到负载均衡的效果,kafka再去做主写读从反而增加系统复杂度)
3,Kafka是按照什么规则将消息划分到topic的各个分区呢?(如果producer指定了要发送的目标分区,消息自然是去到那个分区;否则就按照producer端参数partitioner.class指定的分区策略来定;如果你没有指定过partitioner.class,那么默认的规则是:看消息是否有key,如果有则计算key的murmur2哈希值%topic分区数;如果没有key,按照轮询的方式确定分区)

Apache Kafka是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)。
Kafka社区于0.10.0.0版本正式推出了流处理组件 Kafka Streams,成为分布式流处理平台(类似的还有Apache Storm, Apache Spark和Apache Flink),关于Kafka流处理平台可以看这个视频(https://www.youtube.com/watch?v=I32hmY4diFY)。
此外,Kafka还可以作为分布式存储引擎。。
一些其他了解:阿里成了冤大头??1亿美元收购的开源项目,核心团队出走造竞品,转头又卖了1个亿 | 量子位 (qbitai.com),阿里2019年以9000万欧元收购flink,然后2022年其创世团队集体出走创立新公司Immerok,23年又被硅谷巨头Confluent收购,它同时掌握了Apache Flink和Apache Kafka,成为最大赢家。
关于Kafka和RocketMQ的对比,这篇文章写的比较详细,后续学习可以对比着来学,并思考不同设计的原因。(Kafka和RocketMQ实现原理对比 - 腾讯云开发者社区-腾讯云 (tencent.com)

线上环境要如何部署Kafka?

· 操作系统:将Kafka部署到Linux会更好。原因如下:
(1)I/O模型的使用:Kafka客户端使用Java的selector,selector在Linux中实现机制是epoll,而在Windows上实现机制是select。epoll能够获得比select更高效的I/O性能。
(2)数据网络传输效率:消息保存在磁盘,而生产消费消息需要网络进行传输,所以Kafka需要在磁盘和网络间进行大量数据传输,Linux中的零拷贝(Zero Copy)可以减少数据拷贝的过程从而实现快速数据传输。
(3)社区支持度:社区对Windows平台上发现的Kafka Bug不做任何承诺。。
· 磁盘:使用普通机械磁盘顺序读写,也不用使用RAID(冗余和负载均衡其实Kafka的备份和分区都提供到了)
· 磁盘容量:规划磁盘容量需要考虑这几个因素(新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩),实际使用中建议预留20%~30%
· 带宽:根据带宽,来规划所需Kafka服务器的数量。对于千兆网络(1Gbps),按占用带宽70%来计算,避免大流量下的丢包。

Kafka配置参数

其中的配置不仅指Kafka服务器的配置,还有Broker端参数,主题级别的参数,JVM端参数和操作系统级别参数。
Broker端参数:接近200个
1️⃣ 配置存储信息:log.dirs 配置多块磁盘保留broker日志信息等
2️⃣ ZooKeeper: zookeeper.connect 配置kafka集群和zookeeper集群等,2181为zk默认端口
3️⃣ broker连接:listener监听器 <协议,主机名,端口号>等,其中主机名尽量不要用IP地址,而是用主机名称
4️⃣ Topic管理:是自动创建Topic,是开启Unclean Leader选举,是会更换Leader等
5️⃣ 数据留存:消息保留时间,Broker为消息保存的总磁盘大小等
Topic级别参数:25个左右,Topic级别参数会覆盖全局Broker参数
1️⃣ 消息保存:全局Broker参数可以作为所有业务留存时间的最大值,而每个具体Topic级别参数去把全局参数覆盖。retention.ms 消息保存时长(默认7天),retention.bytes 预留磁盘空间(默认-1,表示无限使用),max.message.bytes Topic最大消息大小。
Topic级别参数可以在 创建Topic时 或者 修改Topic时 进行设置,推荐只采用第二种方式,使用 kafka-configs 来修改Topic级别参数。
JVM参数:Kafka服务端用Scala编写,但终归还是编译成Class文件在JVM上运行,因此JVM参数对Kafka集群也是很重要。建议至少运行在Java 8以上。
1️⃣ 堆大小:通用建议将JVM堆大小设置为6GB,是业界比较公认的一个合理值
2️⃣ 垃圾回收机制:使用Java 8以上可以使用G1收集器,相比CMS的优点在于更少的Full GC和更少需要调整的参数。
在Kafka中配置JVM参数可以通过设置环境变量来做,
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC
$> bin/kafka-server-start.sh config/server.properties
操作系统参数:包括文件描述符限制、文件系统类型、Swappiness以及提交时间等
1️⃣ 文件描述符限制:实际上,文件描述符系统资源并没有那么昂贵,可以调大一点,ulimit -n 1000000.
2️⃣ 文件系统类型:对于ext3, ext4或XFS这样的日志型文件系统,根据官网测试报告XFS性能要强于ext4
3️⃣ swap的调优:网上很多文章将其设置为0,表示禁止Kafka进程使用swap空间。这样当物理内存耗尽,操作系统出发OOM killer随机选择一个进程kill掉,不给用户任何预警。但如果设置一个较小的值,当开始使用swap空间时,至少能够观测到Broker性能开始急剧下降,从而给到一个调优和诊断问题的时间。建议设置为一个接近0但不为0的值,比如1.
4️⃣ Flush落盘时间:向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而只是写入OS的page cache中就可以了,随后OS根据LRU算法定期将page cache中的“脏”数据落盘。这个定期由提交时间确定,默认为5秒。一般情况下认为这个时间太频繁了。考虑到断电丢失以及Kafka提供了多副本的冗余,稍微拉大提交间隔换取性能是合理的。

Kafka生产者

生产者消息分区机制原理剖析

使用Apache Kafka生产和消费消息的时候,希望能够将数据均匀地分配到所有服务器上。
· 消息组织方式:三级结构,主题 - 分区 - 消息
· 分区策略——决定生产者将消息发送到哪个分区。若自定义分区策略,需要显式配置生产者端的参数 partitioner.class. 可以写一个具体类实现 org.apache.kafka.clients.producer.Partitioner 接口,定义了partition() 和 close() 两个方法。
(1)轮询策略:优秀的负载均衡,默认情况下最合理,也最常使用
(2)随机策略:实际表现差于轮询
(3)按消息键保序策略:Kafka允许为每条消息定义消息键,key(可以是有明确业务含义的字符串:客户代码、部门编号、业务ID等)。可以保证同一key的所有消息都进入相同的分区
🎈总的来说,分区是实现负载均衡以及高吞吐的关键。在生产者一端需要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。
Tips: 有的公司为了保证消息在全局有序,仅使用一个分区,大大牺牲了性能。其实key+多分区也能够实现,保证同一批因果依赖的消息分到一个分区即可。

生产者压缩算法

压缩(compression)希望以较小的CPU开销带来更少的磁盘占用或更少的网络I/O传输
Kafka消息格式:分为两层(消息集合 以及 消息,消息集合包括若干日志项,日志项真正封装消息,写入在消息集合层面上进行)
· 消息版本(V1, V2 - 0.11.0.0引入 - 改进CRC校验和压缩算法),V2更省磁盘空间,即使在都不开启压缩的情况下。V1压缩是把多条消息压缩后保存到外层消息的消息体字段;V2压缩是对整个消息集合进行压缩。
何时压缩? —— 可能发生在生产者端和Broker端
· 生产者端:配置compression.type参数,表示启用指定类型的压缩算法
· Broker端:一般来说,Broker从Producer接收到消息后仅是原封不动地保存而不会进行其他修改。除了两种情况:1️⃣Broker端指定了和Producer端不同的压缩算法,就需要解压缩然后用另外的算法压缩;2️⃣Broker端发生了消息格式转换,消息V1,V2版本,如果消息格式不同也需要解压缩和重新压缩。:消息格式转换对性能的影响是很大的,使Kafka丧失了引以为豪的Zero Copy特性。

【思考】为什么消息格式转换就丧失了Zero Copy的特性呢?
相关参考:Kafka生产者压缩 - 掘金 (juejin.cn) — 待学习

一般来说,Kafka消息在producer中压缩,broker原样保存,由consumer自行解压缩
各种消息压缩算法对比
性能指标:(1)压缩比 [原大小/压缩后大小,约高越好];(2)压缩/解压缩吞吐量 [每秒能压缩/解压缩多少MB的数据]
Kafka2.1.0之前支持的3种压缩算法:GZIP、Snappy和LZ4,之后正式支持Zstandard(简写zstd),是Facebook开源的一个压缩算法,能够提供超高的压缩比。
对于Kafka,吞吐量方面:LZ4 > Snappy > zstd和GZIP;压缩比方面:zstd > LZ4 > GZIP > Snappy。具体到物理资源,Snappy算法占用网络带宽最多,zstd最少。在CPU使用率方面,各个算法表现差不多。
工程最佳实践
启用压缩的一个条件是Producer程序运行的机器上CPU资源要充足。而且如果你的环境中带宽资源有限,那么也是建议开启压缩。如果客户端机器CPU资源富余,强烈建议开启zstd压缩,这样能够极大地节省网络资源消耗。
一个bugfix
京东小伙向社区建议去掉因为broker做消息校验而引入的解压缩,据说去掉解压缩之后,broker端的CPU使用率至少降低了50%。(https://issues.apache.org/jira/browse/KAFKA-8106)

无消息丢失配置怎么实现?

首先,要明确Kafka在什么情况下能保证消息不丢失,而不要混淆责任边界。一句话概括:Kafka只对“已提交”的消息做有限度的持久化保证。这句话中的两个核心要素为:
(1) 已提交的消息:当Kafka的若干个Broker成功接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。其中“若干”取决于对“已提交”的定义,只要一个broker保存就算提交,还是所有broker都成功保存才算。也即ack参数的配置。
(2) 有限度的持久化保证:N个Broker中至少有1个存活,Kafka就能保证已提交的消息永远不会丢失。
💣“消息丢失”的案例——冤枉Kafka而已
1,生产者程序丢失数据
目前Kafka Producer是异步发送消息,如果调用的是 producer.send(msg) 这个API,那么它通常会立即返回,但此时你不能认为消息发送已经成功。这种发送方式有一个有趣的名字 “fire and forget”。而导致消息没有发送成功的原因可能有:网络抖动,消息没有到达Broker;消息不合格(例如太大)导致Broker拒绝接收。
==> Producer永远要使用带有回调通知的发送API,即 producer.send(msg, callback)
2,消费者程序丢失数据
Consumer端丢失数据主要体现在要消费的消息不见了。对于消费者端有个“位移”的概念,表示当前消费到Topic分区的位置。“位移”相当于“书签”,标记当前阅读了多少页,下次翻书直接跳到书签页继续阅读。正确使用书签有两个步骤:(1)读书,(2)更新书签页。如果这两步顺序颠倒就会出现消息丢失,例如当前书签90页,先将其放到100页,然后看书,看到95页临时有事中止阅读,那么下次直接到100页阅读,丢失了96~99的内容。Kafka中Consumer端的消息丢失就是这么回事。
==> 维持先消费消息(阅读),再更新位移(书签)的顺序 即可最大限度保证消息不丢失。但是这样可能导致重复消费的问题(后面再讨论如何处理)
3,多线程异步消费消息,consumer自动提交位移
==> 如果是多线程异步处理消费消息,consumer程序不要开启自动提交位移,而是应用程序手动提交位移。
最佳实践

  1. producer使用带有回调通知的send方法:producer.send(msg, callback)
  2. 设置acks = all. [Producer参数] 表明对“已提交”消息的定义,即所有副本Broker都要接收到消息,该消息才算“已提交”,不过对吞吐影响较大;
  3. 设置 retries 为一个较大的值。[Producer参数] 当网络瞬时抖动导致消息发送失败时,producer可以自动重试;
  4. 设置 unclean.leader.election.enable = false. [Broker参数] 如果一个Broker落后原来Leader太多,它一旦成为新的Leader必然导致消息的丢失;
  5. 设置 replication.factor >= 3 ,[Broker参数] 消息冗余机制,多存几份没坏处;
  6. 设置 min.insync.replicas > 1. [Broker参数] 控制消息至少要被写入到多少个副本才算是“已提交”,设置大于1可以提升消息持久性。实际环境中千万不要用默认值1;
  7. 确保 replication.factor > min.insync.replicas. 如果两者相等, 那么一个副本挂了,整个分区就没法工作了。这样可用性比较低。推荐 replication.factor = min.insync.replicas + 1;
  8. 确保消息消费完成再提交。enable.auto.commit最好设置为false。[Consumer参数] 手动提交位移对于单Consumer多线程处理的场景是至关重要的;

客户端不常见的高级功能

Kafka拦截器 : 允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。(修改/包装消息、创建新的消息、甚至是丢弃消息)
可以分为生产者拦截器消费者拦截器。生产者拦截器允许在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。这两种拦截器都支持链的方式,通过参数设定,名字为 interceptor.classes. 用List<string>装载,指定拦截器类时要指定它们的全限定类名。
· 生产者拦截器实现类都要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口,有两个核心方法 onSend 和 onAcknowledgement。注意这两个方法不在同一个线程中被调用,有共享资源要保证线程安全。
· 消费者拦截器实现类都要继实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,同样两个核心方法 onConsume 和 onCommit。
==> 拦截器的使用场景:Kafka拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。

Kafka生产者如何管理TCP连接

· 创建TCP连接的时机:其实在创建生产者KafkaProducer实例时,生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时首先会创建与Broker的连接。会连接 bootstrap.servers 参数指定的所有Broker.
因此在实际使用中,并不建议把集群中所有Broker都配置到 bootstrap.servers 中,通常指定3~4台就足够了(Producer一旦连接到任一台Broker,就能拿到整个集群的Broker信息)。
· Producer端关闭TCP连接的时机:两种情景,①用户主动关闭,调用producer.close() 或者 kill -9 杀掉producer应用;②Kafka自动关闭,指定connections.max.idle.ms 默认情况下参数值为9分钟,即9min内没有任何请求“流过”某个TCP连接,就会自动关闭;该参数设置为-1表示长连接TCP。

幂等生产者和事务生产者

生产者幂等性 以及 事务型消息 可以保证“精确一次”的语义:即不丢失消息,也不重复消费。

Kafka消息交付可靠性保障
即Kafka对Producer和Consumer要处理的消息提供什么样的承诺,常见承诺包括:
①最多一次 (at most once):消息可能会丢失,但绝不会被重复发送;
②至少一次 (at least once)【默认】:消息不会丢失,但有可能被重复发送;
③精确一次 (exactly once):消息不会丢失,也不会被重复发送。
❔那么问题来了,Kafka是怎么做到精确一次的呢… 简单来说是通过两种机制:幂等性(Idempotence)和事务(Transaction)。
“幂等性,其优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。”
幂等性Producer (0.11.0.0引入的新功能),producer可以使能幂等性,Broker会多保留一些字段自动去重消息。能够保证某个主题的一个分区上不出现重复消息,但无法实现多个分区的幂等性。
事务型Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。利用两阶段提交(2PC)机制,实现多分区上的消息无重复。不过因为事务的开销,性能比幂等性Producer更差。(PS. 事务更多用在Kafka Streams中,如果要实现流处理中的精确一次语义,事务是不可少的)

Kafka消费者

消费者组

Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。①组内可有多个消费者实例共享同一个Group ID(一个字符串标识),组内所有消费者协调在一起来消费订阅主题的所有分区。②当然,单个分区只能由同一个消费者组内的一个Consumer实例来消费(该分区当然也可以被其他Group消费)。

Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。
理想情况下,Consumer实例的数量 应该等于 Group订阅主题的分区总数,这样能够最大限度地实现高伸缩性。
如果实例数少了,那么有的消费者就要负责多个分区;如果实例数多了,那么将有消费者不会被分配任何分区,造成资源浪费。

消费者组为传统的消息引擎模型提供了*伸缩性***:点对点模型中的消息只能被下游的一个Consumer消费;发布/订阅模型中,每个消费者必须订阅主题的所有分区。而Consumer Group订阅了多个主题后,组内的每个实例不要求订阅主题的所有分区,可以只消费部分分区中的消息。

消费者端位移

对于Consumer Group来说,位移(Offset)是一个KV对,简化地表示为:Map<TopPartition, Long>.
老版本的Consumer Group将位移保存在Zookeeper中,将服务器节点作为无状态的,但是zookeeper这类框架并不适合频繁的写更新,恰好消费者组的位移更新就是一个非常频繁的操作,所以新版的Consumer Group重新设计了位移的管理方式。
即,将位移保存在一个特殊的主题——__consumer_offsets中。

位移主题 __consumer_offsets

位移的提交要求高持久性,以及支持高频的写操作。Kafka的主题设计天然就满足这两个条件,于是新版本的消费者组位移管理放到主题中。
位移主题是一个普通的Kafka主题,但是它的消息格式是Kafka定义的,用户不能修改。
(1)位移主题的组成
Key: <ConsumerGroup ID,主题名,分区号>
Value: 位移值(其实还有一些其他元数据,e.g. 时间戳、用户自定义的数据等)
:exclamation:注意:独立消费者(Standalone Consumer)运行机制与Consumer Group不同,但是位移管理机制相同,它也有自己的Group ID来标识自己。
(2)位移主题的创建
当Kafka集群中第一个Consumer程序启动时,Kafka就会自动创建位移主题,分区数由Broker端参数 offsets.topic.num.partitions 确定,默认值为50。
所以kafka日志路径下就会冒出很多 __consumer_offsets-xxx 这样的目录,位移主题若由kafka自动创建,那么该主题的分区数是50,副本数是3.

消费者组重平衡

Rebalance(重平衡)本质上是一种协议规定了一个Consumer Group下所有的Consumer如何达成一致,来分配订阅Topic的每个分区
:family_man_girl_boy: 比如某个Group有20个消费者实例,它订阅了具有100个分区的Topic,正常情况kafk会为每个消费者平均分配。
触发Rebalance的条件
① 组成员数目发生变更;
② 订阅主题数发生变更(消费者组以正则表达式订阅主题);
③ 订阅主题的分区数发生变更(kafka当前只允许增加分区数);
Relabance的过程对Consumer Group的消费有极大的影响,重平衡是 stop the world(STW)的方式,所有应用线程都会停止工作。
Rebalance很慢。。曾经一个国外用户的Group内有几百个consumer实例,成功rebalance一次要几个小时。。
拓展:消费者组的弊端又可能会有哪些呢?
(1) 限制了并发度,Group中有效消费者个数至多只能等同于订阅topic的分区数;
(2) 如果各个分区中的数据不均衡,那么就需要等待个别消费者慢慢消费;
(3) 多个消费者组就要保存多个消费者组的位移信息。

P.S. 如果消费者使用assign方法订阅TopicPartition,那么表明该consumer是独立消费者,不属于任何消费者组。