kafka存储结构以及Log清理机制

作者&投稿:爱新觉罗罚 (若有异议请与网页底部的电邮联系)
~ 本文主要聚焦 kafka 的日志存储以及日志清理相关。

首先我们来看一张 kafak 的存储结构图。

如上图所示、kafka 中消息是以主题 topic 为基本单位进行归类的,这里的 topic 是逻辑上的概念,实际上在磁盘存储是根据分区存储的,每个主题可以分为多个分区、分区的数量可以在主题创建的时候进行指定。例如下面 kafka 命令创建了一个 topic 为 test 的主题、该主题下有 4 个分区、每个分区有两个副本保证高可用。

分区的修改除了在创建的时候指定。还可以动态的修改。如下将 kafka 的 test 主题分区数修改为 12 个

分区内每条消息都会被分配一个唯一的消息 id,也就是我们通常所说的 offset, 因此 kafak 只能保证每一个分区内部有序性,不能保证全局有序性。

如果分区设置的合理,那么所有的消息都可以均匀的分布到不同的分区中去,这样可以实现水平扩展。不考虑多副本的情况下,一个分区对应一个 log 日志、如上图所示。为了防止 log 日志过大,kafka 又引入了日志分段(LogSegment)的概念,将 log 切分为多个 LogSegement,相当于一个巨型文件被平均分配为相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegement 也不是纯粹物理意义上的概念,Log 在物理上只是以文件夹的形式存储,而每个 LogSegement 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".txindex"为后缀的事务索引文件)。

kafak 中的 Log 对应了一个命名为<topic>-<partition> 的文件夹。举个例子、假如有一个 test 主题,此主题下游 3 个分区,那么在实际物理上的存储就是 "test-0","test-1","test-2" 这三个文件夹。

向 Log 中写入消息是顺序写入的。只有最后一个 LogSegement 才能执行写入操作,在此之前的所有 LogSegement 都不能执行写入操作。为了方便描述,我们将最后一个 LogSegement 成为"ActiveSegement",即表示当前活跃的日志分段。随着消息的不断写入,当 ActiveSegement 满足一定的条件时,就需要创建新的 activeSegement,之后在追加的消息写入新的 activeSegement。

为了便于消息的检索,每个 LogSegement 中的日志文件(以".log" 为文件后缀)都有对应的两个文件索引:偏移量索引文件(以".index" 为文件后缀)和时间戳索引文件(以".timeindex"为文件后缀)。每个 LogSegement 都有一个“基准偏移量” baseOffset,用来标识当前 LogSegement 中第一条消息的 offset。偏移量是一个 64 位的长整形。日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.log

示例中第 2 个 LogSegment 对应的基准位移是 256,也说明了该 LogSegment 中的第一条消息的偏移量为 256,同时可以反映出第一个 LogSegment 中共有 256 条消息(偏移量从 0 至 254 的消息)。

由于 kafak 是把消息存储 在磁盘上,为了控制消息的不断增加我们就必须对消息做一定的清理和压缩。kakfa 中的每一个分区副本都对应的一个 log 日志文件。而 Log 又分为多个 LogSegement 日志分段。这样也便于日志清理。kafka 内部提供了两种日志清理策略。

按照一定的保留策略直接删除不符合条件的日志分段。

我们可以通过 broker 端参数 log.cleanup.policy 来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将 log.cleanup.policy 设置为“compact”,并且还需要将 log.cleaner.enable(默认值为 true)设定为 true。通过将 log.cleanup.policy 参数设置为“delete,compact”,还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到主题级别,比如与 log.cleanup.policy 对应的主题级别的参数为 cleanup.policy,为了简化说明,本文只采用 broker 端参数做陈述。

日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments),如图下图所示。retentionMs 可以通过 broker 端参数 log.retention.hours、log.retention.minutes 和 log.retention.ms 来配置,其中 log.retention.ms 的优先级最高,log.retention.minutes 次之,log.retention.hours 最低。默认情况下只配置了 log.retention.hours 参数,其值为 168,故默认情况下日志分段文件的保留时间为 7 天。

查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间 lastModifiedTime 来计算的,而是根据日志分段中最大的时间戳 largestTimeStamp 来计算的。因为日志分段的 lastModifiedTime 可以被有意或无意地修改,比如执行了 touch 操作,或者分区副本进行了重新分配,lastModifiedTime 并不能真实地反映出日志分段在磁盘的保留时间。要获取日志分段中的最大时间戳 largestTimeStamp 的值,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取其值,否则才设置为最近修改时间 lastModifiedTime.

若待删除的日志分段的总数等于该日志文件中所有的日志分段的数量,那么说明所有的日志分段都已过期,但该日志文件中还要有一个日志分段用于接收消息的写入,即必须要保证有一个活跃的日志分段 activeSegment,在此种情况下,会先切分出一个新的日志分段作为 activeSegment,然后执行删除操作。

删除日志分段时,首先会从 Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上“.deleted”的后缀(当然也包括对应的索引文件)。最后交由一个以“delete-file”命名的延迟任务来删除这些以“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数来调配,此参数的默认值为 60000,即 1 分钟。

日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),如下图所示。retentionSize 可以通过 broker 端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。注意 log.retention.bytes 配置的是 Log 中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log 日志文件)的大小。单个日志分段的大小由 broker 端参数 log.segment.bytes 来限制,默认值为 1073741824,即 1GB。

基于日志大小的保留策略与基于时间的保留策略类似,首先计算日志文件的总大小 size 和 retentionSize 的差值 diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合 deletableSegments。查找出 deletableSegments 之后就执行删除操作,这个删除操作和基于时间的保留策略的删除操作相同


Kafka在大数据环境中如何应用呢?
数据中心的数据需要共享时,kafka的producer先从数据中心读取数据,然后传入kafka缓存并加入待消费队列。各分支结构作为数据消费者,启动消费动作,从kafka队列读取数据,并对获取的数据进行处理。消息生产者根据需求,灵活定义produceInfoProcess()方法,对相关数据进行处理。并依据数据发布到kafka的情况,处理回调...

消息队列之zeroMQ、rabbitMQ、kafka
首先消息是网络通讯的载体,队列可以理解是一种先进先出的数据结构,消息队列是存放消息的容器,是分布式系统中的重要组件。消息队列的优势在于:解耦、异步、削峰,把相关性不 强的模块独立分开视为解耦,异步就是非必要逻辑异步方式处理,加快响应速度,削峰是避免短期高并发导致系统问题进行缓冲队列处理。消息队列的缺点在于...

架构师经典总结:为什么零拷贝可以提升Kafka性能?
Kafka使用零拷贝(Zero-Copy)技术来提供它的性能,所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手,减少了内核和用户模式之间的上下文切换,零拷贝技术通过DMA技术实现。直接存储器存取方式(Direct Memory Access, DMA) DMA控制方式是以存储器为中心,在主存和I\/O设备之间...

Kafka Consumer Offset解析
看一下源码里这些类的结构 case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) { case class ...

kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance...
3. 由于要保存很多consumer的offset信息,必然引入复杂的数据结构,造成资源浪费。 而Kafka选择了不同的方式:每个consumer group管理自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。 (2)、Kafka默认是定期帮你自动提交位移的(...

kafka问题求助
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者...

消息队列怎么能通俗点解释?
RocketMQ与Kafka的对比,就像两种不同的食堂运营策略。RocketMQ通过轻量级的Namesrv集群取代Zookeeper,保证高可用性和吞吐量,而Pulsar的分层架构则提供了更精细的控制。在存储结构上,RocketMQ以顺序写入提高性能,而Pulsar则采用了更先进的分片设计和动态扩容策略。总的来说,选择适合的消息队列就像在不同的...

大数据有哪些框架
Storm是一个分布式实时计算系统,它可以处理实时数据流。Storm的核心组件是拓扑结构(Topology),它可以将拓扑结构中的每个节点分配给不同的计算节点进行并行处理。Storm还提供了可扩展的API,可以方便地与其他框架集成。Kafka:Kafka是一个分布式流处理平台,它可以用于实时数据流的处理和存储。Kafka的核心组件...

kafka 单机\/集群压力测试
1.3 单机版测试kafka性能 因为测试的次数比较多,也没有去找kafka中数据存储设置,所以就使用docker部署单机版的kafka (因为测试的数据比较多,也就多次的删除了容器,重新启动镜像) 新建目录: mkdir \/usr\/local\/kafka_test dockerfile run.sh sources.list 目录结构如下:生成镜像 docker...

五种大数据处理架构
直接写入Kafka还可避免回压(Backpressure)问题。回压是指当负载峰值导致数据流入速度超过组件实时处理能力的情况,这种情况可能导致处理工作停顿并可能丢失数据。按照设计,Kafka可以将数据保存很长时间,这意味着组件可以在方便的时候继续进行处理,并可直接重启动而无需担心造成任何后果。Samza可以使用以本地键值存储方式实现的...

溆浦县19244657353: 如何在windows下查看kafka源码 -
咎浩宁诺: Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一个分布式的、可分区的(partitioned)、基于备份的(replicated)和commit-log存储的服务..它...

溆浦县19244657353: kafka 怎么判断log 可以删除 -
咎浩宁诺: 1. 删除logs下的相关记录;2. 到zookeeper下的brokers/topics删除相关节点.--------------不好用!!先逻辑删除(./bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic dnsTopic --zookeeper 10.0.1.44:2181,10.0.1.45:2181,10.0.1.46:...

溆浦县19244657353: 考虑一个由 8 个页面、每页 1024 字节组成的存储空间,把它映射到容量为 32 个物理块 的存储器中 -
咎浩宁诺: 8是2的3次方,32是2的5次方;1024=2的10次方,偏移地址是10位.所以逻辑地址等于3+10=13位.物理地址等于5+10=15位啊

溆浦县19244657353: mysql数据库查询好慢怎么解决 -
咎浩宁诺: 28万条数据量不是很大,字段稍微有点多,如果不加WHERE 条件的话,数据库判定是查询所有数据库,而加了WHERE 条件时,数据库判定要去详细的查找某个数据,所以速度自然会慢,建立索引可以解决您的问题; CREATE INDEX 索引名 ON 表名 (WHERE 条件用到的列名,如有多个就以逗号分隔); 这次在去WHERE 的时候就会快很多

溆浦县19244657353: kafka中offset值存储在哪 -
咎浩宁诺: 解决的方法是:分别从kafka中获得某个topic当前每个partition的offset,再从zookeeper中获得某个consumer消费当前topic中每个partition的offset,最后再这两个根据项目情况进行合并,就可以了. 一、具体实现 1、程序实现,如下: public class ...

溆浦县19244657353: 急急急,设计一个算法,将线性表中重复结点删除,线性表用顺序存储结构存储,谢谢 -
咎浩宁诺: 30分只给思路,不写具体算法了.这个事可以有两种方式完成: 1、先对表排序,之后只一次遍历表、其间将重复(与前个表项值相同的)元素剔除; 2、不排序,直接对每个元素都遍历一次全表,剔除重复元素. 显然方式1较好.按方式1进...

溆浦县19244657353: 哥们在吗? 很忙 刚第一个写错了是 数据存储结构的基本类型和主要特点 类型是顺序 和 链式 那么特点呢? -
咎浩宁诺: 顺序存储结构特点:所有的记录依次存储到一个数组中 链表的结构特点是:把每个记录当做一个结点,然后结点依次连接.

溆浦县19244657353: 链式存储结构和顺序存储结构的区别 -
咎浩宁诺: 区别如下: 1、链表存储结构的内存地址不一定是连续的,但顺序存储结构的内存地址一定是连续的. 2、链式存储适用于在较频繁地插入、删除、更新元素是,而顺序存储结构适用于频繁查询时使用. 3、顺序比链式节约空间,是因为链式结...

溆浦县19244657353: kafka为什么使用硬盘存储消息反而性能还很好 -
咎浩宁诺: Jafka/KafkaKafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版.具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台.

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网