當前位置: 妍妍網 > 碼農

Redis 如何實作延時任務佇列

2024-05-06碼農

簡介

顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。

延時任務和定時任務區別

  • 延時任務有別於定時任務,定時任務往往是固定周期的,有明確的觸發時間。

  • 而延時任務一般沒有固定的開始時間,它常常是由一個事件觸發的,而在這個事件觸發之後的一段時間內觸發另一個事件。

  • 任務事件生成時並不想讓消費者立即拿到,而是延遲一定時間後才接收到該事件進行消費。

  • 業務場景

  • 訂單超時,使用者下單後進入支付頁面(通常會有超時限制)超過15分鐘沒有進行操作,那麽這個訂單就需要作廢處理。

  • 如何定期檢查處於退款狀態的訂單是否已經退款成功?

  • 註冊後到現在已經一周的使用者,如何發簡訊撩動。

  • 交易資訊雙重效驗防止因系統級/套用級/使用者級等各種異常情況發生後導致的全部/部份遺失的訂單資訊。

  • 實作重復通知,預設失敗連續通知10次(通知間隔為 n*2+1/min ),直到消費方正確響應,超出推播上限次數後標記為異常狀態,可進行恢復!

  • 使用場景

    延遲佇列多用於需要延遲工作的場景。

    最常見的是以下兩種場景:

    1、延遲消費

    1. 使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。

    2. 使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則發送信件或者簡訊來提醒使用者使用。

    2、延遲重試

    比如消費者從佇列裏消費訊息時失敗了,但是想要延遲一段時間後自動重試。如果不使用延遲佇列,那麽我們只能透過一個輪詢掃描程式去完成。

    掃表存在的問題是

  • 掃表與資料庫長時間連線,在數量量大的情況容易出現連線異常中斷,需要更多的例外處理,對程式健壯性要求高

  • 在數據量大的情況下延時較高,規定內處理不完,影響業務,雖然可以啟動多個行程來處理,這樣會帶來額外的維護成本,不能從根本上解決。

  • 每個業務都要維護一個自己的掃表邏輯。當業務越來越多時,發現掃表部份的邏輯會重復開發,但是非常類似

  • 緩存佇列設計

    場景設計

    實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推播對應的附件。

    這裏簡化為一個訂單資訊數據延遲處理的場景,就是每一筆下單記錄一條訂單訊息(暫時叫做 OrderMessage ),訂單訊息需要延遲5到15秒後進行異步處理。

    延時佇列的實作

    選用了基於 Redis 的有序集合 Sorted Set Crontab 短輪詢進行實作。

    具體方案是:

    1. 訂單建立的時候,訂單ID和當前時間戳分別作為 Sorted Set member score 添加到訂單佇列 Sorted Set 中。

    2. 訂單建立的時候,訂單ID和推播內容 JSON 字串分別作為 field value 添加到訂單佇列內容 Hash 中。

    3. 第1步和第2步操作的時候用 Lua 指令碼保證原子性。

    4. 使用一個異步執行緒透過 Sorted Set 的命令 ZREVRANGEBYSCORE 彈出指定數量的 訂單ID 對應的訂單佇列內容 Hash 中的訂單推播內容數據進行處理。

    對於第4點處理有兩種方案:

    處理方案一

    彈出訂單內容數據的同時進行數據刪除,也就是 ZREVRANGEBYSCORE ZREM HDEL 命令要在同一個 Lua 指令碼中執行,這樣的話 Lua 指令碼的編寫難度大,並且由於彈出數據已經在 Redis 中刪除,如果數據處理失敗則可能需要從資料庫重新查詢補償。

    處理方案二

    彈出訂單內容數據之後,在數據處理完成的時候再主動刪除訂單佇列 Sorted Set 和訂單佇列內容 Hash 中對應的數據,這樣的話需要控制並行,有重復執行的可能性。

    選用了方案一,也就是從 Sorted Set 彈出訂單ID並且從Hash中獲取完推播數據之後馬上刪除這兩個集合中對應的數據。

    方案的流程圖大概是這樣:

    Redis相關命令

    Sorted Set相關命令

    ZADD 命令 - 將一個或多個成員元素及其分數值加入到有序集當中。

    ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN

    ZREVRANGEBYSCORE 命令 - 返回有序集中指定分數區間內的所有的成員。有序整合員按分數值遞減(從大到小)的次序排列。

    ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • max:分數區間 - 最大分數。

  • min:分數區間 - 最小分數。

  • WITHSCORES:可選參數,是否返回分數值,指定則會返回得分值。

  • LIMIT:可選參數,offset和count原理和 MySQL LIMIT offset,size 一致,如果不指定此參數則返回整個集合的數據。

  • [success] ZREM 命令 - 用於移除有序集中的一個或多個成員,不存在的成員將被忽略。

    ZREM key member [member ...]

    Hash相關命令

    HMSET 命令 - 同時將多個field-value(欄位-值)對設定到哈希表中。

    HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN

    HDEL 命令 - 刪除哈希表key中的一個或多個指定欄位,不存在的欄位將被忽略。

    HDEL KEY_NAME FIELD1.. FIELDN

    Lua 語法

  • 載入 Lua 指令碼並且返回指令碼的 SHA-1 字串: SCRIPT LOAD script

  • 執行已經載入的 Lua 指令碼: EVALSHA sha1 numkeys key [key ...] arg [arg ...]

  • unpack 函式可以把 table 型別的參數轉化為可變參數,不過需要註意的是 unpack 函式必須使用在非變量定義的函式呼叫的最後一個參數,否則會失效,詳細見 Stackoverflow 的提問table.unpack() only returns the first element。

  • 如果不熟悉Lua語言,建議系統學習一下,因為想用好Redis,一定離不開Lua。

    Lua 指令碼

    入隊 enqueue.lua

    local zset_key = KEYS[1]
    local hash_key = KEYS[2]
    local zset_value = ARGV[1]
    local zset_score = ARGV[2]
    local hash_field = ARGV[3]
    local hash_value = ARGV[4]
    redis.call('ZADD', zset_key, zset_score, zset_value)
    redis.call('HSET', hash_key, hash_field, hash_value)
    returnnil

    將任務的執行時間作為score,要執行的任務數據作為value,存放在zset中

    出隊 dequeue.lua

    local zset_key = KEYS[1]
    local hash_key = KEYS[2]
    local min_score = ARGV[1]
    local max_score = ARGV[2]
    local offset = ARGV[3]
    local limit = ARGV[4]
    -- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裏利用next做一輪叠代
    localstatustype = next(redis.call('TYPE', zset_key))
    ifstatus ~= nilandstatus == 'ok'then
    iftype == 'zset'then
    local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
    if list ~= niland #list > 0then
    -- unpack函式能把table轉化為可變參數
    redis.call('ZREM', zset_key, unpack(list))
    local result = redis.call('HMGET', hash_key, unpack(list))
    redis.call('HDEL', hash_key, unpack(list))
    return result
    end
    end
    end
    returnnil

    如果最小的分數小於等於當前時間戳,就將該任務取出來執行,否則休眠一段時間後再查詢。

    註意:這裏其實有一個效能隱患,命令 ZREVRANGEBYSCORE 的時間復雜度可以視為為O(N),N是集合的元素個數,由於這裏把所有的訂單資訊都放進了同一個Sorted Set(ORDER_QUEUE)中,所以在一直有新增數據的時候, dequeue 指令碼的時間復雜度一直比較高,後續訂單量升高之後會此處一定會成為效能瓶頸,後面會給出解決的方案

    這裏的出隊使用 Crontab 作為輪訓去查詢消費

    業務核心程式碼

    延遲佇列類 RedisDelayQueue.php

    <?php
    /**
     * @desc Redis 延遲任務佇列
     * @author Tinywan(ShaoBo Wan)
     * @date 2024/05/02 11:36
     */

    declare(strict_types=1);
    namespaceredis;
    classRedisDelayQueue
    {
    // 生產者 指令碼sha值
    const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA';
    // 消費者 指令碼sha值
    const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA';
    // 訂單關閉
    const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE';
    // 訂單關閉詳情哈希
    const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH';
    /**
    * Redis 靜態例項
    @return \Redis
    */

    privatestaticfunction_redis()
    {
    $redis = \redis\BaseRedis::server();
    $redis->select(3);
    return $redis;
    }
    /**
    @desc: 延遲佇列 生產者
    @param string $keys1
    @param string $keys2
    @param string $member
    @param int $score
    @param array $message
    @return mixed
    */

    publicstaticfunctionproducer(string $keys1, string $keys2, string $member, int $score, array $message)
    {
    $redis = self::_redis();
    $scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA);
    if (!$scriptSha) {
    $script = <<<luascript
    redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
    redis.call('HSET', KEYS[2], ARGV[2], ARGV[3])
    return 1
    luascript;

    $scriptSha = $redis->script('load', $script);
    $redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha);
    }
    $hashValue = json_encode($message, JSON_UNESCAPED_UNICODE);
    return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2);
    }
    /**
    @desc: 延遲佇列 消費者
    @param string $keys1
    @param string $keys2
    @param int $maxScore
    @return mixed
    */

    publicstaticfunctionconsumer(string $keys1, string $keys2, int $maxScore)
    {
    $redis = self::_redis();
    $scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA);
    if (!$scriptSha) {
    $script = <<<luascript
    local status, type = next(redis.call('TYPE', KEYS[1]))
    if status ~= nil and status == 'ok' then
    if type == 'zset' then
    local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4])
    if list ~= nil and #list > 0 then
    redis.call('ZREM', KEYS[1], unpack(list))
    local result = redis.call('HMGET', KEYS[2], unpack(list))
    redis.call('HDEL', KEYS[2], unpack(list))
    return result
    end
    end
    end
    luascript;

    $scriptSha = $redis->script('load', $script);
    $redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha);
    }
    return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 010], 2);
    }
    }






    用redis來實作可以依賴於redis自身的持久化來實作持久化,redis的集群來支持高並行和高可用。因此開發成本很小,可以做到很即時。

    指令碼命令列

    生產者訊息

    privatefunctiondelayQueueOrderClose()
    {
    $orderId = time();
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    $score = time() + 60// 延遲60秒執行
    $message = [
    'event' => RedisDelayQueue::EVENT_ORDER_CLOSE,
    'order_id' => $orderId,
    'create_time' => time()
    ];
    $res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
    var_dump($res);
    }

    如果是ThinkPHP6 框架,執行該命令則可以生產訊息, php think crontab delay-queue-order-producer

    迴圈

    privatefunctiondelayOrderProducer()
    {
    $keys1 = DelayQueue::KEY_ORDER_CLOSE;
    $keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH;
    for ($i = 1; $i <= 10; $i++) {
    $orderId = 'S' . $i;
    $score = time(); // 延遲60秒執行
    $message = [
    'event' => DelayQueue::EVENT_ORDER_CLOSE,
    'order_id' => $orderId,
    'create_time' => time()
    ];
    $res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
    var_dump($res);
    }
    }

    消費者訊息

    1、透過Crontab 輪詢執行

    privatefunctiondelayQueueOrderConsumer()
    {
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    $maxScore = time();
    $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
    if (false === $queueList) {
    echo' [x] Message List is Empty, Try Again '"\n";
    return;
    }
    var_dump($queueList);
    }

    說明:如果最小的分數小於等於當前時間戳,就將該任務取出來執行,否則休眠一段時間後再查詢

    2、阻塞執行

    privatefunctiondelayQueueOrderConsumerWhile()
    {
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    while (true) {
    $maxScore = time();
    $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
    if (false === $queueList) {
    echo' [x] Message List is Empty, Try Again '"\n";
    sleep(1);
    continue;
    }
    // 處理業務
    foreach ($queueList as $queue) {
    $messageArray = json_decode($queue, true);
    }
    }
    }

    數據刪除為處理問題

    方案一:彈出訂單內容數據的同時進行數據刪除,也就是 ZREVRANGEBYSCORE ZREM HDEL 命令要在同一個 Lua 指令碼中執行,這樣的話 Lua 指令碼的編寫難度大,並且由於彈出數據已經在Redis中刪除,如果數據處理失敗則可能需要從資料庫重新查詢補償。

    針對以上的解決方案就是: 訊息進入到延遲佇列後,保證至少被消費一次。

  • 消費延遲佇列訊息後(zset結構中掃描到期的訊息),不及時消費

  • 把讀取的訊息放入一個 redis stream 佇列,同時加入消費組

  • 透過消費組消費 redis stream 消費,處理業務邏輯

  • Redis Stream 消費組,讀取訊息處理並且 ACK(將訊息標記為"已處理")

  • 如果訊息讀取但是沒處理,則進入XPENDING 列表,進行二次消費並且 ACK(將訊息標記為"已處理")