Kafka核心组件之控制器和协调器

作者&投稿:逯肃 (若有异议请与网页底部的电邮联系)
~

[TOC]

我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。

他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。

kafka每个broker启动的时候,都会实例化一个KafkaController,并将broker的id注册到zookeeper,集群在启动过程中,通过选举机制选举出其中一个broker作为leader,也就是前面所说的控制器。

包括集群启动在内,有三种情况触发控制器选举:

1、集群启动

2、控制器所在代理发生故障

3、zookeeper心跳感知,控制器与自己的session过期

按照惯例,先看图。我们根据下图来讲解集群启动时,控制器选举过程。

假设此集群有三个broker,同时启动。

(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。

(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。

(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。

(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。

经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。

此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及操作的一致性。

由此可见,Kafka控制器选举就是看谁先争抢到/controller节点写入自身信息。

控制器的初始化,其实是初始化控制器所用到的组件及监听器,准备元数据。

前面提到过每个broker都会实例化并启动一个KafkaController。KafkaController和他的组件关系,以及各个组件的介绍如下图:

图中箭头为组件层级关系,组件下面还会再初始化其他组件。可见控制器内部还是有些复杂的,主要有以下组件:

1、ControllerContext,此对象存储了控制器工作需要的所有上下文信息,包括存活的代理、所有主题及分区分配方案、每个分区的AR、leader、ISR等信息。

2、一系列的listener,通过对zookeeper的监听,触发相应的操作,黄色的框的均为listener

3、分区和副本状态机,管理分区和副本。

4、当前代理选举器ZookeeperLeaderElector,此选举器有上位和退位的相关回调方法。

5、分区leader选举器,PartitionLeaderSelector

6、主题删除管理器,TopicDeletetionManager

7、leader向broker批量通信的ControllerBrokerRequestBatch。缓存状态机处理后产生的request,然后统一发送出去。

8、控制器平衡操作的KafkaScheduler,仅在broker作为leader时有效。

Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。

故障转移其实就是leader所在broker发生故障,leader转移为其他的broker。转移的过程就是重新选举leader的过程。

重新选举leader后,需要为该broker注册相应权限,调用的是ZookeeperLeaderElector的onControllerFailover()方法。在这个方法中初始化和启动了一系列的组件来完成leader的各种操作。具体如下,其实和控制器初始化有很大的相似度。

1、注册分区管理的相关监听器

2、注册主题管理的相关监听

3、注册代理变化监听器

4、重新初始化ControllerContext,

5、启动控制器和其他代理之间通信的ControllerChannelManager

6、创建用于删除主题的TopicDeletionManager对象,并启动。

7、启动分区状态机和副本状态机

8、轮询每个主题,添加监听分区变化的PartitionModificationsListener

9、如果设置了分区平衡定时操作,那么创建分区平衡的定时任务,默认300秒检查并执行。

除了这些组件的启动外,onControllerFailover方法中还做了如下操作:

1、/controller_epoch值+1,并且更新到ControllerContext

2、检查是否出发分区重分配,并做相关操作

3、检查需要将优先副本选为leader,并做相关操作

4、向kafka集群所有代理发送更新元数据的请求。

下面来看leader权限被取消时,调用的方法onControllerResignation

1、该方法中注销了控制器的权限。取消在zookeeper中对于分区、副本感知的相应监听器的监听。

2、关闭启动的各个组件

3、最后把ControllerContext中记录控制器版本的数值清零,并设置当前broker为RunnignAsBroker,变为普通的broker。

通过对控制器启动过程的学习,我们应该已经对kafka工作的原理有了了解, 核心是监听zookeeper的相关节点,节点变化时触发相应的操作

有新的broker加入集群时,称为代理上线。反之,当broker关闭,推出集群时,称为代理下线。

代理上线:

1、新代理启动时向/brokers/ids写数据

2、BrokerChangeListener监听到变化。对新上线节点调用controllerChannelManager.addBroker(),完成新上线代理网络层初始化

3、调用KafkaController.onBrokerStartup()处理

3.5恢复因新代理上线暂停的删除主题操作线程

代理下线:

1、查找下线节点集合

2、轮询下线节点,调用controllerChannelManager.removeBroker(),关闭每个下线节点网络连接。清空下线节点消息队列,关闭下线节点request请求

3、轮询下线节点,调用KafkaController.onBrokerFailure处理

4、向集群全部存活代理发送updateMetadataRequest请求

顾名思义,协调器负责协调工作。本节所讲的协调器,是用来协调消费者工作分配的。简单点说,就是消费者启动后,到可以正常消费前,这个阶段的初始化工作。消费者能够正常运转起来,全有赖于协调器。

主要的协调器有如下两个:

1、消费者协调器(ConsumerCoordinator)

2、组协调器(GroupCoordinator)

kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。

为了解决这些问题,kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。如下图:

消费者协调器,可以看作是消费者做操作的代理类(其实并不是),消费者很多操作通过消费者协调器进行处理。

消费者协调器主要负责如下工作:

1、更新消费者缓存的MetaData

2、向组协调器申请加入组

3、消费者加入组后的相应处理

4、请求离开消费组

5、向组协调器提交偏移量

6、通过心跳,保持组协调器的连接感知。

7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。

8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。

消费者协调器主要依赖的组件和说明见下图:

可以看到这些组件和消费者协调器担负的工作是可以对照上的。

组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:

组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。它主要依赖的组件见下图:

这些组件也是和组协调器的功能能够对应上的。具体内容不在详述。

下图展示了消费者启动选取leader、入组的过程。

消费者入组的过程,很好的展示了消费者协调器和组协调器之间是如何配合工作的。leader consumer会承担分区分配的工作,这样kafka集群的压力会小很多。同组的consumer通过组协调器保持同步。消费者和分区的对应关系持久化在kafka内部主题。

消费者消费时,会在本地维护消费到的位置(offset),就是偏移量,这样下次消费才知道从哪里开始消费。如果整个环境没有变化,这样做就足够了。但一旦消费者平衡操作或者分区变化后,消费者不再对应原来的分区,而每个消费者的offset也没有同步到服务器,这样就无法接着前任的工作继续进行了。

因此只有把消费偏移量定期发送到服务器,由GroupCoordinator集中式管理,分区重分配后,各个消费者从GroupCoordinator读取自己对应分区的offset,在新的分区上继续前任的工作。

下图展示了不提交offset到服务端的问题:

开始时,consumer 0消费partition 0 和1,后来由于新的consumer 2入组,分区重新进行了分配。consumer 0不再消费partition2,而由consumer 2来消费partition 2,但由于consumer之间是不能通讯的,所有consumer2并不知道从哪里开始自己的消费。

因此consumer需要定期提交自己消费的offset到服务端,这样在重分区操作后,每个consumer都能在服务端查到分配给自己的partition所消费到的offset,继续消费。

由于kafka有高可用和横向扩展的特性,当有新的分区出现或者新的消费入组后,需要重新分配消费者对应的分区,所以如果偏移量提交的有问题,会重复消费或者丢消息。偏移量提交的时机和方式要格外注意!!

1、自动提交偏移量

设置 enable.auto.commit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。

这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。

2、手动提交偏移量

设置 enable.auto.commit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。

commitSync()是同步提交偏移量,主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费。

这里我们可以使用commitAsync()异步提交偏移量。只管提交,而不会等待broker返回提交结果

commitSync只要没有发生不可恢复错误,会进行重试,直到成功。而commitAsync不会进行重试,失败就是失败了。commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。那么如果此时发生再均衡,新的消费者将会重复消费消息。




当kafka集群其中一台宕机后,会怎么样?
修改kafka的核心配置文件server.properties 将num.partitions参数(默认为1)修改为3,另外需要添加auto.create.topics.enable=true ,如果没有对应的topic可以主动创建topic。由于__consumer_offsets是kafka默认的主题,无法删除,我们可以删除zookeeper中的__consumer_offsets。 进入zookeeper\/bin目录执行.\/zk...

实时计算组件有哪些
实时计算的组件有很多,数据采集组件及中间件:Flume、Sqoop、Kafka、Logstash、Splunk等。大数据集群核心组件:Hadoop、Hive、Impala、HBase、Spark(Core、SQL、Streaming、MLlib)、Flink、Zookeeper等,大概如下:数据从底层的数据源开始,经过Kafka、Flume等数据组件进行收集,然后分成两条线进行计算:一条线...

大数据核心技术有哪些
1、数据采集与预处理:FlumeNG实时日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供数据同步服务。2、数据存储:Hadoop作为一个开源的框架,专为离线和大规模数据分析而设计,HDFS作为其核心的存储引擎,已被广泛用于数据存储...

怎么设置kafka topic数据存储时间
1、Kafka创建topic命令很简单,一条命令足矣:bin\/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 。2.此命令将创建一个名为test的topic,其中有三个分区,每个分区需要分配三个副本。三。topic创建主要分为两部分:命令行controller逻辑部分...

在大数据中心需要什么样的技术?
12.Kafka Kafka是一种高吞吐量的分布式发布订阅消息系统,其在大数据开发应用上的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。大数据开发需掌握Kafka架构原理及各组件的作用和是用方法及相关功能的实现!13.Scala Scala是一门多范式的编程语言,大数据开发...

Kafka相关面试题
如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。传统的文件拷贝通常需要从用户态去转到核心态,经过read buffer,然后再返回到用户态的应用层buffer,然后再从用户态把数据拷贝到...

消息中间件(一)MQ详解及四大MQ比较
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。 2、消息中间件的组成 2.1 Broker 消息服务器,作为server提供消息核心服务 2.2 Producer 消息生产者,业务的发起方,负责生产消息传输给broker, 2.3 Consumer 消息消费者,业务的处理方,负责从broker获取...

kafka配置更改了需要重启吗
我觉得应该是需要的,一般来说任何的电子设备进行了一些重大的更改都是需要重启的,特别是你的这个还是更改了设备的这个样子的。下面是关于配置的扩展资料。硬件方面 1.CPU,这个主要取决于频率和二级缓存,三级缓存,核心数量。频率越高、二级缓存越大,三级缓存越大,核心越多,运行速度越快。速度越快...

kafka是否适合在docker中使用?单机集群是否有意义
kafka是否适合在docker中使用?单机集群是否有意义  我来答 1个回答 #热议# 你见过哪些因为老板...通过这种解耦,各个层次只要关注自己的核心功能点即可。保证你上层的Framework\/Application可以移植Spark是个...伴生组件。因为有了哑应用的存在,分布式系统为了能够和这些应用交互,需要有一个代理。而这个代理和被...

“根本就不需要 Kafka 这样的大型分布式系统!”
虽然不是很必要。使用这些技术栈会导致各个公司承担不必要的债务,导致他们不得不在风险投资周期中寻求更多的资金,无法迈向精益或从别人的资金中解脱出来。这种不幸的趋势只会持续下去,我们唯一能做的就是公之于众。原文:https:\/\/vicki.substack .com \/p\/you-dont-need-kafka 【END】

长武县13672861485: 该怎样更快的学好云计算?
蒯施典舒: 云计算可以分为广义的和狭义的两类狭义的云计算是指IT基础设施的交付和使用模式.是指通过网络需求、扩展的方式获得所需的资源;广义的云计算是指服务的使用和交付模式,指通过网络按需求、扩展的方式获得对应的服务.这种服务可以...

长武县13672861485: 房产证办公证还能被判产权无效吗 我买的二手房 但是怕房子之前有抵债或是产权纠纷 -
蒯施典舒: 直接办理产权变更手续.这样就算之前有过一房多卖、抵债等等情况,但产权已经归你,你在获得产权时不知道这些情况,作为善意第三人取得,法律首先会保障你的产权,其次才处理之前那些债务追偿等等纠纷,这些都是卖主和人家的事,你就没事了.

长武县13672861485: 大数据该怎么学习,自学能学会吗? -
蒯施典舒: 一、大数据学习路线 Java-Linux-Hadoop-Zookeeper-Mysql-Sqoop-Hive-Oozie-Hbase-Kafka-Spark 二、大数据自学的问题1、自学需要有一定开发经验,而且需要有极强的学习能力,和坚强的意志.学习能力这个不用解释了.意志这个问题我讲...

长武县13672861485: 大数据架构师培训 大数据架构师需要具备哪些技能 -
蒯施典舒: 想要成为合格的大数据分析师,就需要熟悉消息中间件(Kafka等),熟悉数据中间件(Mybatis等);JAVA基础扎实,有相关开发或者实习经验,熟悉IO、多线程、MQ、数据结构与设计模式等;能够基于Linux平台工作,熟练使用shell脚本. ...

长武县13672861485: 什么事计算机硬件系统?什么是计算机软件系统? -
蒯施典舒: 一个完整的计算机系统,是由硬件系统和软件系统两大部分组成的.一、电脑的硬件系统 所谓硬件,就是用手能摸得着的实物,一台电脑一般有:1、主机:主机从外观看是一个整体,但打开机箱后,会发现它的内部由多种独立的部件组合而成...

长武县13672861485: kubernetes中副本控制器和资源对象replication controller的区别 -
蒯施典舒: Kubernetes是一个开源项目,它把谷歌的集群管理工具引入到虚拟机和裸机场景中.它可以完美运行在现代的操作系统环境(比如CoreOS 和Red Hat Atomic),并提供可以被你管控的轻量级的计算节点.Kubernetes使用Golang开发,具有轻量...

长武县13672861485: Kafka架构描述正确的是(). - 上学吧技能鉴定
蒯施典舒: 丝绸之路》是一篇略读课文.这篇课文以独特的视角,生动描绘了两千多年前中国与安息古国交流的一幕,再现了西部灿烂辉煌的历史文化,说明了丝绸之路不仅是古代亚欧互通有无的商贸大道,还是促进亚欧各国和中国往来、沟通东西方文化的...

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网