欢迎访问合肥市大数据资产运营有限公司网站!
0551-65909059   公司OA
联系电话:
当前位置:首页>>新闻中心>>行业动态 >>这才是kafka
今天是: 2024年05月16日   【农历:四月初九】  星期四
这才是kafka

这才是kafka

文章转载自:https://www.jianshu.com/p/d3e963ff8b70

作者:郑杰文,腾讯云存储,高级后台工程师

简介

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。kafka的总体数据流是这样的:


大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。可以看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的,下面会讲到。关于broker、topics、partitions的一些元信息用zk来存,监控和路由啥的也都会用到zk。

生产

基本流程是这样的


创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
如果partition没填,那么情况会是这样的:

1、key有填
按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)

2、key没填
round-robin来选partition

这些要发往同一个partition的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。

API

有high level api,替我们把很多事情都干了,offset,路由啥都替我们干了,用以来很简单。
还有simple api,offset啥的都是要我们自己记录。

partition

当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)

然后这里就涉及两个细节:怎么分配partition,怎么选leader

关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controllerkafka使用zkbroker中选出一个controller,用于partition分配和leader选举。

partition的分配

1、将所有Broker(假设共nBroker)和待分配的Partition排序

2、将第iPartition分配到第(i mod n)个Broker上 (这个就是leader

3、将第iPartition的第jReplica分配到第((i + j) mode n)个Broker

leader容灾

controller会在Zookeeper/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leadercontrollerzk/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partitionISRin-sync replica已同步的副本)列表,选一个出来做leader
选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。为什么这里不是使用zk通知,而是直接给broker发送rpc请求,我的理解可能是这样做zk有性能问题吧。

如果ISR列表是空,那么会根据配置,随便选一个replicaleader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。

多副本同步

这里的策略,服务端这边的处理是followerleader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。
生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。

acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。

这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。

ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partitionleader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;

leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的brokerfetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader

也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection

思考:
acks=-1

1、是follwers都来fetch就返回成功,还是等follwers第二轮fetch

2leader已经写入本地,但是ISR中有些机器失败,那么怎么处理呢?

消费

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。


API

订阅topic时,可以用正则表达式,如果有新topic匹配上,那能自动订阅上。

offset的保存

一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topictopic中。写进消息的keygroupidtopicpartition组成,value是偏移量offsettopic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个keyoffset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

确定consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式:

__consumers_offsets partition =
           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。

思考:
如果正在跑的服务,修改了offsets.topic.num.partitions,那么offset的保存是不是就乱套了?

分配partition--reblance

生产过程中broker要分配partition消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition
下面从顶向下,分别阐述一下

1、怎么选coordinator

2、交互流程。

3reblance的流程。

coordinator

1、看offset保存在那个partition

2、该partition leader所在的broker就是被选定的coordinator

这里我们可以看到,consumer groupcoordinator,和保存consumer group offsetpartition leader是同一台机器。

交互流程

coordinator选出来之后,就是要分配了
整个流程是这样的:

1consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。

2consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。

reblance流程

1、consumercoordinator发送JoinGroupRequest请求。

2、这时其他consumerheartbeat请求过来时,coordinator会告诉他们,要reblance了。

3、其他consumer发送JoinGroupRequest请求。

4、所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition

5consumercoordinator发送SyncGroupRequest,其中leaderSyncGroupRequest会包含分配的情况。
6
coordinator回包,把分配的情况告诉consumer,包括leader

partition或者消费者的数量发生变化时,都得进行reblance
列举一下会reblance的情况:

1、增加partition

<p style="margin-top: 0px;margin-bottom: 0.8rem;padding: 0px;list-style: none;-webkit-font-smoothing: antialiased