当前位置: 欣欣网 > 码农

Redis 如何实现延时任务队列

2024-05-06码农

简介

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

延时任务和定时任务区别

  • 延时任务有别于定时任务,定时任务往往是固定周期的,有明确的触发时间。

  • 而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件。

  • 任务事件生成时并不想让消费者立即拿到,而是延迟一定时间后才接收到该事件进行消费。

  • 业务场景

  • 订单超时,用户下单后进入支付页面(通常会有超时限制)超过15分钟没有进行操作,那么这个订单就需要作废处理。

  • 如何定期检查处于退款状态的订单是否已经退款成功?

  • 注册后到现在已经一周的用户,如何发短信撩动。

  • 交易信息双重效验防止因系统级/应用级/用户级等各种异常情况发生后导致的全部/部分丢失的订单信息。

  • 实现重复通知,默认失败连续通知10次(通知间隔为 n*2+1/min ),直到消费方正确响应,超出推送上限次数后标记为异常状态,可进行恢复!

  • 使用场景

    延迟队列多用于需要延迟工作的场景。

    最常见的是以下两种场景:

    1、延迟消费

    1. 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。

    2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

    2、延迟重试

    比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。

    扫表存在的问题是

  • 扫表与数据库长时间连接,在数量量大的情况容易出现连接异常中断,需要更多的异常处理,对程序健壮性要求高

  • 在数据量大的情况下延时较高,规定内处理不完,影响业务,虽然可以启动多个进程来处理,这样会带来额外的维护成本,不能从根本上解决。

  • 每个业务都要维护一个自己的扫表逻辑。当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似

  • 缓存队列设计

    场景设计

    实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。

    这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 OrderMessage ),订单消息需要延迟5到15秒后进行异步处理。

    延时队列的实现

    选用了基于 Redis 的有序集合 Sorted Set Crontab 短轮询进行实现。

    具体方案是:

    1. 订单创建的时候,订单ID和当前时间戳分别作为 Sorted Set member score 添加到订单队列 Sorted Set 中。

    2. 订单创建的时候,订单ID和推送内容 JSON 字符串分别作为 field value 添加到订单队列内容 Hash 中。

    3. 第1步和第2步操作的时候用 Lua 脚本保证原子性。

    4. 使用一个异步线程通过 Sorted Set 的命令 ZREVRANGEBYSCORE 弹出指定数量的 订单ID 对应的订单队列内容 Hash 中的订单推送内容数据进行处理。

    对于第4点处理有两种方案:

    处理方案一

    弹出订单内容数据的同时进行数据删除,也就是 ZREVRANGEBYSCORE ZREM HDEL 命令要在同一个 Lua 脚本中执行,这样的话 Lua 脚本的编写难度大,并且由于弹出数据已经在 Redis 中删除,如果数据处理失败则可能需要从数据库重新查询补偿。

    处理方案二

    弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列 Sorted Set 和订单队列内容 Hash 中对应的数据,这样的话需要控制并发,有重复执行的可能性。

    选用了方案一,也就是从 Sorted Set 弹出订单ID并且从Hash中获取完推送数据之后马上删除这两个集合中对应的数据。

    方案的流程图大概是这样:

    Redis相关命令

    Sorted Set相关命令

    ZADD 命令 - 将一个或多个成员元素及其分数值加入到有序集当中。

    ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN

    ZREVRANGEBYSCORE 命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。

    ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • max:分数区间 - 最大分数。

  • min:分数区间 - 最小分数。

  • WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。

  • LIMIT:可选参数,offset和count原理和 MySQL LIMIT offset,size 一致,如果不指定此参数则返回整个集合的数据。

  • [success] ZREM 命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

    ZREM key member [member ...]

    Hash相关命令

    HMSET 命令 - 同时将多个field-value(字段-值)对设置到哈希表中。

    HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN

    HDEL 命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。

    HDEL KEY_NAME FIELD1.. FIELDN

    Lua 语法

  • 加载 Lua 脚本并且返回脚本的 SHA-1 字符串: SCRIPT LOAD script

  • 执行已经加载的 Lua 脚本: EVALSHA sha1 numkeys key [key ...] arg [arg ...]

  • unpack 函数可以把 table 类型的参数转化为可变参数,不过需要注意的是 unpack 函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见 Stackoverflow 的提问table.unpack() only returns the first element。

  • 如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。

    Lua 脚本

    入队 enqueue.lua

    local zset_key = KEYS[1]
    local hash_key = KEYS[2]
    local zset_value = ARGV[1]
    local zset_score = ARGV[2]
    local hash_field = ARGV[3]
    local hash_value = ARGV[4]
    redis.call('ZADD', zset_key, zset_score, zset_value)
    redis.call('HSET', hash_key, hash_field, hash_value)
    returnnil

    将任务的执行时间作为score,要执行的任务数据作为value,存放在zset中

    出队 dequeue.lua

    local zset_key = KEYS[1]
    local hash_key = KEYS[2]
    local min_score = ARGV[1]
    local max_score = ARGV[2]
    local offset = ARGV[3]
    local limit = ARGV[4]
    -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
    localstatustype = next(redis.call('TYPE', zset_key))
    ifstatus ~= nilandstatus == 'ok'then
    iftype == 'zset'then
    local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
    if list ~= niland #list > 0then
    -- unpack函数能把table转化为可变参数
    redis.call('ZREM', zset_key, unpack(list))
    local result = redis.call('HMGET', hash_key, unpack(list))
    redis.call('HDEL', hash_key, unpack(list))
    return result
    end
    end
    end
    returnnil

    如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询。

    注意:这里其实有一个性能隐患,命令 ZREVRANGEBYSCORE 的时间复杂度可以视为为O(N),N是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set(ORDER_QUEUE)中,所以在一直有新增数据的时候, dequeue 脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案

    这里的出队使用 Crontab 作为轮训去查询消费

    业务核心代码

    延迟队列类 RedisDelayQueue.php

    <?php
    /**
     * @desc Redis 延迟任务队列
     * @author Tinywan(ShaoBo Wan)
     * @date 2024/05/02 11:36
     */

    declare(strict_types=1);
    namespaceredis;
    classRedisDelayQueue
    {
    // 生产者 脚本sha值
    const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA';
    // 消费者 脚本sha值
    const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA';
    // 订单关闭
    const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE';
    // 订单关闭详情哈希
    const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH';
    /**
    * Redis 静态实例
    @return \Redis
    */

    privatestaticfunction_redis()
    {
    $redis = \redis\BaseRedis::server();
    $redis->select(3);
    return $redis;
    }
    /**
    @desc: 延迟队列 生产者
    @param string $keys1
    @param string $keys2
    @param string $member
    @param int $score
    @param array $message
    @return mixed
    */

    publicstaticfunctionproducer(string $keys1, string $keys2, string $member, int $score, array $message)
    {
    $redis = self::_redis();
    $scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA);
    if (!$scriptSha) {
    $script = <<<luascript
    redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
    redis.call('HSET', KEYS[2], ARGV[2], ARGV[3])
    return 1
    luascript;

    $scriptSha = $redis->script('load', $script);
    $redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha);
    }
    $hashValue = json_encode($message, JSON_UNESCAPED_UNICODE);
    return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2);
    }
    /**
    @desc: 延迟队列 消费者
    @param string $keys1
    @param string $keys2
    @param int $maxScore
    @return mixed
    */

    publicstaticfunctionconsumer(string $keys1, string $keys2, int $maxScore)
    {
    $redis = self::_redis();
    $scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA);
    if (!$scriptSha) {
    $script = <<<luascript
    local status, type = next(redis.call('TYPE', KEYS[1]))
    if status ~= nil and status == 'ok' then
    if type == 'zset' then
    local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4])
    if list ~= nil and #list > 0 then
    redis.call('ZREM', KEYS[1], unpack(list))
    local result = redis.call('HMGET', KEYS[2], unpack(list))
    redis.call('HDEL', KEYS[2], unpack(list))
    return result
    end
    end
    end
    luascript;

    $scriptSha = $redis->script('load', $script);
    $redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha);
    }
    return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 010], 2);
    }
    }






    用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。

    脚本命令行

    生产者消息

    privatefunctiondelayQueueOrderClose()
    {
    $orderId = time();
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    $score = time() + 60// 延迟60秒执行
    $message = [
    'event' => RedisDelayQueue::EVENT_ORDER_CLOSE,
    'order_id' => $orderId,
    'create_time' => time()
    ];
    $res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
    var_dump($res);
    }

    如果是ThinkPHP6 框架,执行该命令则可以生产消息, php think crontab delay-queue-order-producer

    循环

    privatefunctiondelayOrderProducer()
    {
    $keys1 = DelayQueue::KEY_ORDER_CLOSE;
    $keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH;
    for ($i = 1; $i <= 10; $i++) {
    $orderId = 'S' . $i;
    $score = time(); // 延迟60秒执行
    $message = [
    'event' => DelayQueue::EVENT_ORDER_CLOSE,
    'order_id' => $orderId,
    'create_time' => time()
    ];
    $res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
    var_dump($res);
    }
    }

    消费者消息

    1、通过Crontab 轮询执行

    privatefunctiondelayQueueOrderConsumer()
    {
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    $maxScore = time();
    $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
    if (false === $queueList) {
    echo' [x] Message List is Empty, Try Again '"\n";
    return;
    }
    var_dump($queueList);
    }

    说明:如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询

    2、阻塞执行

    privatefunctiondelayQueueOrderConsumerWhile()
    {
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    while (true) {
    $maxScore = time();
    $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
    if (false === $queueList) {
    echo' [x] Message List is Empty, Try Again '"\n";
    sleep(1);
    continue;
    }
    // 处理业务
    foreach ($queueList as $queue) {
    $messageArray = json_decode($queue, true);
    }
    }
    }

    数据删除为处理问题

    方案一:弹出订单内容数据的同时进行数据删除,也就是 ZREVRANGEBYSCORE ZREM HDEL 命令要在同一个 Lua 脚本中执行,这样的话 Lua 脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。

    针对以上的解决方案就是: 消息进入到延迟队列后,保证至少被消费一次。

  • 消费延迟队列消息后(zset结构中扫描到期的消息),不及时消费

  • 把读取的消息放入一个 redis stream 队列,同时加入消费组

  • 通过消费组消费 redis stream 消费,处理业务逻辑

  • Redis Stream 消费组,读取消息处理并且 ACK(将消息标记为"已处理")

  • 如果消息读取但是没处理,则进入XPENDING 列表,进行二次消费并且 ACK(将消息标记为"已处理")