延时队列常用实现详解

作者&投稿:展贱 (若有异议请与网页底部的电邮联系)
~

队列是一种线性表,内部的元素是有序的,具有先进先出的特性。
延时队列,顾名思义,它是一个队列,但更重要的是具有延时的特性,与普通队列的先进先出不同,延时队列可以指定队列中的消息在某个时间点被消费。

DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延迟期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素, 当执行队列take操作元素未过期时会阻塞当前线程到元素过期为止 ;PriorityQueue是通过二叉小顶堆实现, 其任意一个非叶子节点的权值,都不大于其左右子节点的权值。

示例
队列中的元素必须实现Delayed接口

redis key的过期事件是通过redis 2.8.0之后版本提供的订阅发布功能(pub/sub)下发的,当key过期后系统自动Pub,应用程序只需订阅(sub)该事件即可。

实现步骤

示例

存在的问题

key的失效通知无法保证时效性。redis过期策略有一下三种:

默认情况下,Redis 使用的是 惰性删除 + 定期删除 的策略;每隔一段时间(可通过hz参数设置每秒执行的次数),Redis 会分别从各个库随机选取部分测试设置了过期时间的 Key,判断它们是否过期,过期则删除;如果 key 已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除。

可通过如下方式提高时效性

redis zset 结构是一个有序集合,每个元素都会关联一个 double 类型的分数,通过分数来为集合中的成员进行从小到大的排序;有序集合的成员是唯一的,但分数(score)却可以重复。

实现思路

将任务id作为member,到期时间作为score存入到zset中,然后不断轮询获取第一个元素,判断其是否过期,过期后删除并执行任务即可。

也可以通过lua脚本将 zrangebyscore 和 zrem 操作变成原子操作,避免了多线程时同一个me mber多次zrem。

存在的问题

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过ttl及dlx(Dead Letter Exchanges)特性模拟出延迟队列的功能。

绑定在死信交换机上的队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange(死信交换机)和x-dead-letter-routing-key(指定routing-key发送,可选),当消息在一个队列中变成死信 (dead message) 之后,按照这两个参数可以将消息重新路由到另一个DLX Exchange(死信交换机),让消息重新被消费。

队列出现Dead Letter的情况有:

RabbitMQ可以对消息和队列设置TTL,为队列设置时,队列中所有消息都有相同的过期时间;对消息进行单独设置,每条消息过期时间可以不同;如果同时设置了队列的ttl和消息的ttl以两者之间TTL较小的那个数值为准。消息超过设置的ttl值未被消费,将会变为死信,消费者将无法再收到该消息。

ttl消息按照入发送顺序排列在队列中,且rabbitMQ只会判断队列头消息是否失效,失效后才会加入到死信队列中,如果发送多个过期时间不一致的消息,有可能后面的消息已经过期了,但队列头消息没有过期,导致其他消息不能及时加入到死信队列被消费。

针对上述的问题,可以使用 rabbitmq_delayed_message_exchang 插件来解决。

安装该插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间(通过消息头的x-delay指定),如达到可投递时间时并将其通过 x-delayed-type 类型标记的交换机类型投递至目标队列。

插件的安装

使用示例

插件的局限

时间轮的应用广泛,包括linux内核的调度、zookeeper、netty、kafka、xxl-job、quartz等均有使用时间轮。

图中的圆盘可以看作是钟表的刻度。比如一圈round长度为24秒,刻度数为8,那么每一个刻度表示3秒。那么时间精度就是3秒。每个刻度为一个bucket(实际上就是TimerTaskList),TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务;根据每个TimerTaskEntry的过期时间和当前时间轮的时间,选择一个合适的bucket,把这个TimerTaskEntry对象放进去;对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的;二是多层时间轮,Kakfa 是这样实现的。

下面介绍下kafka的多层时间轮,层数越高时间跨度越大。

每个使用到的TimerTaskList都会加入到DelayQueue中,DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头,通过一个线程获取到DelayQueue中的超时的任务列表TimerTaskList之后,既可以根据TimerTaskList的expiration来推进时间轮的时间,也可以就获取到的TimerTaskList执行相应的操作,TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。




嵌入式实时操作系统FreeRTOS的复习---队列管理
FreeRTOS 中队列管理详解 FreeRTOS 提供了一套基于队列的通信与同步机制。队列作为内核对象,可由多个任务共同使用,实现数据交换与同步。队列特性:1. 数据存储:队列能保存固定数量的数据单元(通常为结构体),其最大存储量称为队列深度。2. 多任务存取:队列独立于任何任务,所有任务均可对其进行读取与...

数据结构与算法-队列
而链队列不存在这个问题,尽管它需要一个指针域,会产生一些空间上的开销,但也可以接受。所以在空间上,链队列更加灵活。总的来说,在可以确定队列长度最大值的情况下,建议用循环队列,如果你无法预估队列的长度时,则用链队列。栈和队列也都可以通过链式存储结构来实现,实现原则上与线性表基本相同如图...

redis怎么做消息队列?
基于Redis消息队列-实现短信服务化 1.Redis实现消息队列原理,常用的消息队列有RabbitMQ,ActiveMQ,个人觉得这种消息队列太大太重,本文介绍下基于Redis的轻量级消息队列服务。 一般来说,消息队列有两种模式,一种是发布者订阅模式,另外一种是生产者和消费者模式。Redis的消息队列,也是基于这2种原理的实现...

顺序队列在进行入队操作时,首先要判断队列是否为
在进行入队操作时,通常需要使用一个循环数组来实现队列。循环数组是一种可以循环利用空间的数组,当队列的长度超过了容量时,可以通过循环利用空间来继续添加元素。在实现循环数组时,需要使用取余操作来判断队列的头部或尾部是否到达了数组的末尾,同时需要使用加法操作来实现队列的循环利用空间。顺序队列在...

消息中间件(一)MQ详解及四大MQ比较
7 常见消息中间件MQ介绍 7.1 RocketMQ 阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述...

循环队列通常用什么来实现队列的头尾相接?
循环队列其实就是个数组,是靠队头、队尾、下标来实现头尾相接,如队列A有5个位置,当到达A【4】时,判断到达队尾了,下标变道队头0,即可回到A【0】——队列头部。循环队列应该注意判断队列是否为空,是否满。将队列存储空间的最后一个位置绕到第一个位置,形成逻辑上的环状空间,供队列循环使用。

基于Redisson实现延迟队列
我们只要从 RBlockingQueue 队列中取数据即可。好像还是不够深入,我们接着看。我们知道 Redisson 是基于redis来实现的那么我们看看里面到底做了什么事 打开redis客户端,执行monitor命令,看下在执行上面订单操作时redis到底执行了哪些命令 monitor命令可以看到操作redis时执行了什么命令 这里参考: https:\/\/...

向量队列初始化时元素的初始值分别为多少?
与初始为空矛盾.所以rear=(0-1)%n=n-1.2、循环队列为充分利用向量空间,克服"假溢出"现象的方法是:将向量空间想象为一个首尾相接的圆环,同时我们称这种向量为循环向量。3、存储在其中的队列称为循环队列(Circular Queue)。这种循环队列可以用单链表的形式来在实际编程应用中来实现。

消息队列原理及选型
Consumer收到消息时需要显式的向rabbit broker发送basic。ack消息或者consumer订阅消息时设置auto_ack参数为true。 在通信过程中,队列对ACK的处理有以下几种情况: 即消息的Ackownledge确认机制,为了保证消息不丢失,消息队列提供了消息Acknowledge机制,即ACK机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时...

DPDK 无锁环形队列(Ring)详解--段子解法
为便于理解,我尝试以幽默的段子形式来解析DPDK中的环形队列。首先,环形队列在DPDK中常用于队列管理,它具有固定大小,不同于链表的动态性。与链表队列相比,环形队列的优点包括高效性和无锁操作,但同时也存在空间固定和并发访问时可能出现的环形溢出问题。环形队列的应用场景包括数据传输和多线程协作。在...

淮安市19163051227: 普通的列队是怎么实现延迟的
王邵苦双: 但这是同步,想异步,我们需要用setTimeout, var el = document.getElementById("test"); var q = new Queue(); q.queue(function(){ var self = this; el.innerHTML = 1 setTimeout(function(){ self.dequeue() },1000); }).queue(function(){ var self = this; el....

淮安市19163051227: JavaScript中的Timer是怎么工作的 -
王邵苦双: 通常它们的表现行为并不是那么地直观,而这是因为它们都处在一个单一线程中.让我们先来看一看三个用来创建以及操作timer的函数.var id = setTimeout(fn, delay);- 初始化一个单一的timer,这个timer将会在一定延时后去调用指定的函数....

淮安市19163051227: 怎么让redis队列延迟10s -
王邵苦双: 方案:dt=[{'id':2,'name':'zhangsan','phone':138888888888},{'id':3,'name':'lisi','phone':13888888888},{'id':4,'name':'shenfl','phone':13888888888}] sql='insert into py_test values(:id,:name,:phone)' for xin dt:

淮安市19163051227: 希望得到一道汇编程序的延时时间的解释(题目和答案都有,只是不了解) -
王邵苦双: 是这样的:分支转移指令的指令(例如你程序中的LOOP和JNZ等等)的周期不是固定的一个数,而是与走哪个分支相关的,那个17 OR 5就是这个意思,以...

淮安市19163051227: jquery怎么设置动作延迟时间 -
王邵苦双: dylay()方法的格式如下: delay(duration,[queueName])这个方法的功能是设置一个延时值来推迟后续队列中动画的执行,其中参数duration为延时的时间值,单位是毫秒,可选参数[queueName]表示队列名词,即动画队列.

淮安市19163051227: linux计划任务如何实现延时执行脚本,求简洁实例 -
王邵苦双: 两个方法: 1)crontab中的时间设定最小可以到分钟,如果你的延时是分钟级别的,直接放大crontab中的分钟设定即可. 2)修改脚本,在脚本开头(当然要在shell声明语句如#!/bin/bash之后)加usleep(微秒级)或sleep命令(秒级),也可以实现延时.

淮安市19163051227: php怎么使用队列来处理批量采集 -
王邵苦双: 首先,队列的作用不是批量处理,而是延时处理,也叫异步处理 要做批量采集的话,首先你要划分好区间,可以用php的多进程,也可以用php的cli模式做,只要数据不窜就行

淮安市19163051227: 操作系统老师让我们,用最简单的C语言知识,实现操作系统中多级反馈队列调度算法,大侠们帮帮忙吧,谢谢了 -
王邵苦双: 多级(假设为N级)反馈队列调度算法可以如下原理: 1.. &gt、设有N个队列(Q1; Priority(QN),Q2.,按照时间片轮转法调度.比如Q1队列的时间片为N,位于队列Q2中有N个作业,它们的运行时间是通过Q2这个队列所设定的时间片来确定的(...

淮安市19163051227: linux定时器和延时工作队列的区别 -
王邵苦双: 工作队列中是即将要调度到的任务队列,等待队列是暂时被挂起的任务队列,或者有些任务无事可做休眠状态的任务,它们会在某些条件触发时恢复换入工作队列并进入执行状态,同样在工作队列中的任务在某个时刻也可以被换入到等待队列中

淮安市19163051227: 写出几种常见的数据结构,并简单介绍其内部实现原理?
王邵苦双: 数据元素相互之间的关系称为结构. 有四类基本结构:集合、线性结构、树形结构、图状结构. 集合结构:除了同属于一种类型外,别无其它关系. 线性结构:元素之间存在一对一关系常见类型有:?数组,链表,队列,栈,它们之间在操作上有所区别. 例如:链表可在任意位置插入或删除元素,而队列在队尾插入元素,队头删除元素, 栈只能在栈顶进行插入,删除操作. 树形结构:元素之间存在一对多关系,常见类型有:树(有许多特例:二叉树、平衡二叉树、查找树等). 图形结构:元素之间存在多对多关系,图形结构中每个结点的前驱结点数和后续结点多个数可以任意.

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网