簡介
顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。
延時任務和定時任務區別
延時任務有別於定時任務,定時任務往往是固定周期的,有明確的觸發時間。
而延時任務一般沒有固定的開始時間,它常常是由一個事件觸發的,而在這個事件觸發之後的一段時間內觸發另一個事件。
任務事件生成時並不想讓消費者立即拿到,而是延遲一定時間後才接收到該事件進行消費。
業務場景
訂單超時,使用者下單後進入支付頁面(通常會有超時限制)超過15分鐘沒有進行操作,那麽這個訂單就需要作廢處理。
如何定期檢查處於退款狀態的訂單是否已經退款成功?
註冊後到現在已經一周的使用者,如何發簡訊撩動。
交易資訊雙重效驗防止因系統級/套用級/使用者級等各種異常情況發生後導致的全部/部份遺失的訂單資訊。
實作重復通知,預設失敗連續通知10次(通知間隔為
n*2+1/min
),直到消費方正確響應,超出推播上限次數後標記為異常狀態,可進行恢復!
使用場景
延遲佇列多用於需要延遲工作的場景。
最常見的是以下兩種場景:
1、延遲消費
使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。
使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則發送信件或者簡訊來提醒使用者使用。
2、延遲重試
比如消費者從佇列裏消費訊息時失敗了,但是想要延遲一段時間後自動重試。如果不使用延遲佇列,那麽我們只能透過一個輪詢掃描程式去完成。
掃表存在的問題是
掃表與資料庫長時間連線,在數量量大的情況容易出現連線異常中斷,需要更多的例外處理,對程式健壯性要求高
在數據量大的情況下延時較高,規定內處理不完,影響業務,雖然可以啟動多個行程來處理,這樣會帶來額外的維護成本,不能從根本上解決。
每個業務都要維護一個自己的掃表邏輯。當業務越來越多時,發現掃表部份的邏輯會重復開發,但是非常類似
緩存佇列設計
場景設計
實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推播對應的附件。
這裏簡化為一個訂單資訊數據延遲處理的場景,就是每一筆下單記錄一條訂單訊息(暫時叫做
OrderMessage
),訂單訊息需要延遲5到15秒後進行異步處理。
延時佇列的實作
選用了基於
Redis
的有序集合
Sorted Set
和
Crontab
短輪詢進行實作。
具體方案是:
訂單建立的時候,訂單ID和當前時間戳分別作為
Sorted Set
的member
和score
添加到訂單佇列Sorted Set
中。訂單建立的時候,訂單ID和推播內容
JSON
字串分別作為field
和value
添加到訂單佇列內容Hash
中。第1步和第2步操作的時候用
Lua
指令碼保證原子性。使用一個異步執行緒透過
Sorted Set
的命令ZREVRANGEBYSCORE
彈出指定數量的訂單ID
對應的訂單佇列內容Hash
中的訂單推播內容數據進行處理。
對於第4點處理有兩種方案:
處理方案一
彈出訂單內容數據的同時進行數據刪除,也就是
ZREVRANGEBYSCORE
、
ZREM
和
HDEL
命令要在同一個
Lua
指令碼中執行,這樣的話
Lua
指令碼的編寫難度大,並且由於彈出數據已經在
Redis
中刪除,如果數據處理失敗則可能需要從資料庫重新查詢補償。
處理方案二
彈出訂單內容數據之後,在數據處理完成的時候再主動刪除訂單佇列
Sorted Set
和訂單佇列內容
Hash
中對應的數據,這樣的話需要控制並行,有重復執行的可能性。
選用了方案一,也就是從
Sorted Set
彈出訂單ID並且從Hash中獲取完推播數據之後馬上刪除這兩個集合中對應的數據。
方案的流程圖大概是這樣:
Redis相關命令
Sorted Set相關命令
ZADD
命令 - 將一個或多個成員元素及其分數值加入到有序集當中。
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN
ZREVRANGEBYSCORE
命令 - 返回有序集中指定分數區間內的所有的成員。有序整合員按分數值遞減(從大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
max:分數區間 - 最大分數。
min:分數區間 - 最小分數。
WITHSCORES:可選參數,是否返回分數值,指定則會返回得分值。
LIMIT:可選參數,offset和count原理和
MySQL
的
LIMIT offset,size
一致,如果不指定此參數則返回整個集合的數據。
[success]
ZREM
命令 - 用於移除有序集中的一個或多個成員,不存在的成員將被忽略。
ZREM key member [member ...]
Hash相關命令
HMSET
命令 - 同時將多個field-value(欄位-值)對設定到哈希表中。
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN
HDEL
命令 - 刪除哈希表key中的一個或多個指定欄位,不存在的欄位將被忽略。
HDEL KEY_NAME FIELD1.. FIELDN
Lua 語法
載入
Lua
指令碼並且返回指令碼的
SHA-1
字串:
SCRIPT LOAD script
。
執行已經載入的
Lua
指令碼:
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
。
unpack
函式可以把
table
型別的參數轉化為可變參數,不過需要註意的是
unpack
函式必須使用在非變量定義的函式呼叫的最後一個參數,否則會失效,詳細見
Stackoverflow
的提問table.unpack() only returns the first element。
如果不熟悉Lua語言,建議系統學習一下,因為想用好Redis,一定離不開Lua。
Lua 指令碼
入隊
enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
returnnil
將任務的執行時間作為score,要執行的任務數據作為value,存放在zset中
出隊
dequeue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裏利用next做一輪叠代
localstatus, type = next(redis.call('TYPE', zset_key))
ifstatus ~= nilandstatus == 'ok'then
iftype == 'zset'then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= niland #list > 0then
-- unpack函式能把table轉化為可變參數
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
returnnil
如果最小的分數小於等於當前時間戳,就將該任務取出來執行,否則休眠一段時間後再查詢。
註意:這裏其實有一個效能隱患,命令
ZREVRANGEBYSCORE
的時間復雜度可以視為為O(N),N是集合的元素個數,由於這裏把所有的訂單資訊都放進了同一個Sorted Set(ORDER_QUEUE)中,所以在一直有新增數據的時候,
dequeue
指令碼的時間復雜度一直比較高,後續訂單量升高之後會此處一定會成為效能瓶頸,後面會給出解決的方案
這裏的出隊使用
Crontab
作為輪訓去查詢消費
業務核心程式碼
延遲佇列類 RedisDelayQueue.php
<?php
/**
* @desc Redis 延遲任務佇列
* @author Tinywan(ShaoBo Wan)
* @date 2024/05/02 11:36
*/
declare(strict_types=1);
namespaceredis;
classRedisDelayQueue
{
// 生產者 指令碼sha值
const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA';
// 消費者 指令碼sha值
const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA';
// 訂單關閉
const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE';
// 訂單關閉詳情哈希
const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH';
/**
* Redis 靜態例項
* @return \Redis
*/
privatestaticfunction_redis()
{
$redis = \redis\BaseRedis::server();
$redis->select(3);
return $redis;
}
/**
* @desc: 延遲佇列 生產者
* @param string $keys1
* @param string $keys2
* @param string $member
* @param int $score
* @param array $message
* @return mixed
*/
publicstaticfunctionproducer(string $keys1, string $keys2, string $member, int $score, array $message)
{
$redis = self::_redis();
$scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA);
if (!$scriptSha) {
$script = <<<luascript
redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
redis.call('HSET', KEYS[2], ARGV[2], ARGV[3])
return 1
luascript;
$scriptSha = $redis->script('load', $script);
$redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha);
}
$hashValue = json_encode($message, JSON_UNESCAPED_UNICODE);
return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2);
}
/**
* @desc: 延遲佇列 消費者
* @param string $keys1
* @param string $keys2
* @param int $maxScore
* @return mixed
*/
publicstaticfunctionconsumer(string $keys1, string $keys2, int $maxScore)
{
$redis = self::_redis();
$scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA);
if (!$scriptSha) {
$script = <<<luascript
local status, type = next(redis.call('TYPE', KEYS[1]))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4])
if list ~= nil and #list > 0 then
redis.call('ZREM', KEYS[1], unpack(list))
local result = redis.call('HMGET', KEYS[2], unpack(list))
redis.call('HDEL', KEYS[2], unpack(list))
return result
end
end
end
luascript;
$scriptSha = $redis->script('load', $script);
$redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha);
}
return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 0, 10], 2);
}
}
用redis來實作可以依賴於redis自身的持久化來實作持久化,redis的集群來支持高並行和高可用。因此開發成本很小,可以做到很即時。
指令碼命令列
生產者訊息
privatefunctiondelayQueueOrderClose()
{
$orderId = time();
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
$score = time() + 60; // 延遲60秒執行
$message = [
'event' => RedisDelayQueue::EVENT_ORDER_CLOSE,
'order_id' => $orderId,
'create_time' => time()
];
$res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
var_dump($res);
}
如果是ThinkPHP6 框架,執行該命令則可以生產訊息,
php think crontab delay-queue-order-producer
迴圈
privatefunctiondelayOrderProducer()
{
$keys1 = DelayQueue::KEY_ORDER_CLOSE;
$keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH;
for ($i = 1; $i <= 10; $i++) {
$orderId = 'S' . $i;
$score = time(); // 延遲60秒執行
$message = [
'event' => DelayQueue::EVENT_ORDER_CLOSE,
'order_id' => $orderId,
'create_time' => time()
];
$res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
var_dump($res);
}
}
消費者訊息
1、透過Crontab 輪詢執行
privatefunctiondelayQueueOrderConsumer()
{
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
$maxScore = time();
$queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
if (false === $queueList) {
echo' [x] Message List is Empty, Try Again ', "\n";
return;
}
var_dump($queueList);
}
說明:如果最小的分數小於等於當前時間戳,就將該任務取出來執行,否則休眠一段時間後再查詢
2、阻塞執行
privatefunctiondelayQueueOrderConsumerWhile()
{
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
while (true) {
$maxScore = time();
$queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
if (false === $queueList) {
echo' [x] Message List is Empty, Try Again ', "\n";
sleep(1);
continue;
}
// 處理業務
foreach ($queueList as $queue) {
$messageArray = json_decode($queue, true);
}
}
}
數據刪除為處理問題
方案一:彈出訂單內容數據的同時進行數據刪除,也就是
ZREVRANGEBYSCORE
、
ZREM
和
HDEL
命令要在同一個
Lua
指令碼中執行,這樣的話
Lua
指令碼的編寫難度大,並且由於彈出數據已經在Redis中刪除,如果數據處理失敗則可能需要從資料庫重新查詢補償。
針對以上的解決方案就是: 訊息進入到延遲佇列後,保證至少被消費一次。
消費延遲佇列訊息後(zset結構中掃描到期的訊息),不及時消費
把讀取的訊息放入一個 redis stream 佇列,同時加入消費組
透過消費組消費 redis stream 消費,處理業務邏輯
Redis Stream 消費組,讀取訊息處理並且
ACK(將訊息標記為"已處理")
如果訊息讀取但是沒處理,則進入XPENDING 列表,進行二次消費並且
ACK(將訊息標記為"已處理")