简介
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
延时任务和定时任务区别
延时任务有别于定时任务,定时任务往往是固定周期的,有明确的触发时间。
而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件。
任务事件生成时并不想让消费者立即拿到,而是延迟一定时间后才接收到该事件进行消费。
业务场景
订单超时,用户下单后进入支付页面(通常会有超时限制)超过15分钟没有进行操作,那么这个订单就需要作废处理。
如何定期检查处于退款状态的订单是否已经退款成功?
注册后到现在已经一周的用户,如何发短信撩动。
交易信息双重效验防止因系统级/应用级/用户级等各种异常情况发生后导致的全部/部分丢失的订单信息。
实现重复通知,默认失败连续通知10次(通知间隔为
n*2+1/min
),直到消费方正确响应,超出推送上限次数后标记为异常状态,可进行恢复!
使用场景
延迟队列多用于需要延迟工作的场景。
最常见的是以下两种场景:
1、延迟消费
用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
2、延迟重试
比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。
扫表存在的问题是
扫表与数据库长时间连接,在数量量大的情况容易出现连接异常中断,需要更多的异常处理,对程序健壮性要求高
在数据量大的情况下延时较高,规定内处理不完,影响业务,虽然可以启动多个进程来处理,这样会带来额外的维护成本,不能从根本上解决。
每个业务都要维护一个自己的扫表逻辑。当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似
缓存队列设计
场景设计
实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。
这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做
OrderMessage
),订单消息需要延迟5到15秒后进行异步处理。
延时队列的实现
选用了基于
Redis
的有序集合
Sorted Set
和
Crontab
短轮询进行实现。
具体方案是:
订单创建的时候,订单ID和当前时间戳分别作为
Sorted Set
的member
和score
添加到订单队列Sorted Set
中。订单创建的时候,订单ID和推送内容
JSON
字符串分别作为field
和value
添加到订单队列内容Hash
中。第1步和第2步操作的时候用
Lua
脚本保证原子性。使用一个异步线程通过
Sorted Set
的命令ZREVRANGEBYSCORE
弹出指定数量的订单ID
对应的订单队列内容Hash
中的订单推送内容数据进行处理。
对于第4点处理有两种方案:
处理方案一
弹出订单内容数据的同时进行数据删除,也就是
ZREVRANGEBYSCORE
、
ZREM
和
HDEL
命令要在同一个
Lua
脚本中执行,这样的话
Lua
脚本的编写难度大,并且由于弹出数据已经在
Redis
中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
处理方案二
弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列
Sorted Set
和订单队列内容
Hash
中对应的数据,这样的话需要控制并发,有重复执行的可能性。
选用了方案一,也就是从
Sorted Set
弹出订单ID并且从Hash中获取完推送数据之后马上删除这两个集合中对应的数据。
方案的流程图大概是这样:
Redis相关命令
Sorted Set相关命令
ZADD
命令 - 将一个或多个成员元素及其分数值加入到有序集当中。
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN
ZREVRANGEBYSCORE
命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
max:分数区间 - 最大分数。
min:分数区间 - 最小分数。
WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。
LIMIT:可选参数,offset和count原理和
MySQL
的
LIMIT offset,size
一致,如果不指定此参数则返回整个集合的数据。
[success]
ZREM
命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。
ZREM key member [member ...]
Hash相关命令
HMSET
命令 - 同时将多个field-value(字段-值)对设置到哈希表中。
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN
HDEL
命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。
HDEL KEY_NAME FIELD1.. FIELDN
Lua 语法
加载
Lua
脚本并且返回脚本的
SHA-1
字符串:
SCRIPT LOAD script
。
执行已经加载的
Lua
脚本:
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
。
unpack
函数可以把
table
类型的参数转化为可变参数,不过需要注意的是
unpack
函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见
Stackoverflow
的提问table.unpack() only returns the first element。
如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。
Lua 脚本
入队
enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
returnnil
将任务的执行时间作为score,要执行的任务数据作为value,存放在zset中
出队
dequeue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
localstatus, type = next(redis.call('TYPE', zset_key))
ifstatus ~= nilandstatus == 'ok'then
iftype == 'zset'then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= niland #list > 0then
-- unpack函数能把table转化为可变参数
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
returnnil
如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询。
注意:这里其实有一个性能隐患,命令
ZREVRANGEBYSCORE
的时间复杂度可以视为为O(N),N是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set(ORDER_QUEUE)中,所以在一直有新增数据的时候,
dequeue
脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案
这里的出队使用
Crontab
作为轮训去查询消费
业务核心代码
延迟队列类 RedisDelayQueue.php
<?php
/**
* @desc Redis 延迟任务队列
* @author Tinywan(ShaoBo Wan)
* @date 2024/05/02 11:36
*/
declare(strict_types=1);
namespaceredis;
classRedisDelayQueue
{
// 生产者 脚本sha值
const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA';
// 消费者 脚本sha值
const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA';
// 订单关闭
const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE';
// 订单关闭详情哈希
const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH';
/**
* Redis 静态实例
* @return \Redis
*/
privatestaticfunction_redis()
{
$redis = \redis\BaseRedis::server();
$redis->select(3);
return $redis;
}
/**
* @desc: 延迟队列 生产者
* @param string $keys1
* @param string $keys2
* @param string $member
* @param int $score
* @param array $message
* @return mixed
*/
publicstaticfunctionproducer(string $keys1, string $keys2, string $member, int $score, array $message)
{
$redis = self::_redis();
$scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA);
if (!$scriptSha) {
$script = <<<luascript
redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
redis.call('HSET', KEYS[2], ARGV[2], ARGV[3])
return 1
luascript;
$scriptSha = $redis->script('load', $script);
$redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha);
}
$hashValue = json_encode($message, JSON_UNESCAPED_UNICODE);
return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2);
}
/**
* @desc: 延迟队列 消费者
* @param string $keys1
* @param string $keys2
* @param int $maxScore
* @return mixed
*/
publicstaticfunctionconsumer(string $keys1, string $keys2, int $maxScore)
{
$redis = self::_redis();
$scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA);
if (!$scriptSha) {
$script = <<<luascript
local status, type = next(redis.call('TYPE', KEYS[1]))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4])
if list ~= nil and #list > 0 then
redis.call('ZREM', KEYS[1], unpack(list))
local result = redis.call('HMGET', KEYS[2], unpack(list))
redis.call('HDEL', KEYS[2], unpack(list))
return result
end
end
end
luascript;
$scriptSha = $redis->script('load', $script);
$redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha);
}
return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 0, 10], 2);
}
}
用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。
脚本命令行
生产者消息
privatefunctiondelayQueueOrderClose()
{
$orderId = time();
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
$score = time() + 60; // 延迟60秒执行
$message = [
'event' => RedisDelayQueue::EVENT_ORDER_CLOSE,
'order_id' => $orderId,
'create_time' => time()
];
$res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
var_dump($res);
}
如果是ThinkPHP6 框架,执行该命令则可以生产消息,
php think crontab delay-queue-order-producer
循环
privatefunctiondelayOrderProducer()
{
$keys1 = DelayQueue::KEY_ORDER_CLOSE;
$keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH;
for ($i = 1; $i <= 10; $i++) {
$orderId = 'S' . $i;
$score = time(); // 延迟60秒执行
$message = [
'event' => DelayQueue::EVENT_ORDER_CLOSE,
'order_id' => $orderId,
'create_time' => time()
];
$res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
var_dump($res);
}
}
消费者消息
1、通过Crontab 轮询执行
privatefunctiondelayQueueOrderConsumer()
{
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
$maxScore = time();
$queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
if (false === $queueList) {
echo' [x] Message List is Empty, Try Again ', "\n";
return;
}
var_dump($queueList);
}
说明:如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询
2、阻塞执行
privatefunctiondelayQueueOrderConsumerWhile()
{
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
while (true) {
$maxScore = time();
$queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
if (false === $queueList) {
echo' [x] Message List is Empty, Try Again ', "\n";
sleep(1);
continue;
}
// 处理业务
foreach ($queueList as $queue) {
$messageArray = json_decode($queue, true);
}
}
}
数据删除为处理问题
方案一:弹出订单内容数据的同时进行数据删除,也就是
ZREVRANGEBYSCORE
、
ZREM
和
HDEL
命令要在同一个
Lua
脚本中执行,这样的话
Lua
脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
针对以上的解决方案就是: 消息进入到延迟队列后,保证至少被消费一次。
消费延迟队列消息后(zset结构中扫描到期的消息),不及时消费
把读取的消息放入一个 redis stream 队列,同时加入消费组
通过消费组消费 redis stream 消费,处理业务逻辑
Redis Stream 消费组,读取消息处理并且
ACK(将消息标记为"已处理")
如果消息读取但是没处理,则进入XPENDING 列表,进行二次消费并且
ACK(将消息标记为"已处理")