前言
今年接觸了一個策略類手遊相關的計畫,後端本身計劃是使用
skynet
進行開發的,後來結合計畫的時間緊急程度和客戶端開發組討論後決定使用PHP進行快速開發,後期再使用其他語言框架進行拆分業務;綜合考慮最後選用了webman作為主要開發框架。
整體計畫分為
配置服務
、
HTTP-API
服務、
websocket
服務三大部份,其中配置管理主要是相容客戶端生成的配置數據進行匯入匯出轉換載入,底層使用
MySQL
進行儲存,多服務間使用
Redis
進行一級緩存,服務行程間使用了基於
APCu
的共享緩存,後期我將該共享緩存元件化也貢獻給了社群。
【workbunny】共享快取 https://www.workerman.net/plugin/133
Redis
在遊戲開發界實際上使用Redis的情況還是比較多的,我們使用Redis主要還是為了將一些數據緩存共享給各個伺服器例項:
┌─────┐ ┌─────┐
| A | ────────────> service <──────────── | B |
└─────┘ └─────┘
/ | \ / | \
┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐
| a | | b | | c | ───────> instance <─────── | a | | b | | c |
└───┘ └───┘ └───┘ └───┘ └───┘ └───┘
| | | | | |
1|21|21|2 ────────> process <──────── 1|21|21|2
3|43|43|43|43|43|4
如圖所示,我們分為
A/B
區服,每個區服下可能存在abc不同的伺服器例項,他們需要共享相同的區服配置;每個區服各自管理自己的資料庫數據區域/資料庫例項;每個區服下的伺服器例項對於資料庫數據的要求是強需求,且為變動較為頻繁的數據內容,與web的微服務有區別,所以我們沒有使用類似Nacos或者其他配置中心進行處理,從而用更適配當前場景的Redis作為緩存服務。
同時Redis也可以作為使用者登入鑒權相關中的一環,也可以為營運相關功能提供一些輔助,比如使用
Redis-Stream
作為訊息佇列,處理一些事件通知等。
共享記憶體
在遊戲開發中,許多業務都是在記憶體中進行的計算處理,而我們上述的模式是多行程模式,行程間通訊是一個比較頻繁出現的點;一開始解決這個問題是粗暴的將一些固定業務固定在對應的行程上執行,盡可能避免行程間的通訊問題。
後來隨著業務逐步的擴大,單純限制業務是沒辦法完全實作的,這時候有考慮過使用
webman的外掛程式
channel
;但實際上
channel
基於
socket
涉及系統內核態使用者態的拷貝等問題,同時受網路影響受限,在一些業務的計算處理上會帶來比較高的延遲,包括Redis也同樣是這樣的問題,我們需要實作數據的零拷貝。
後續我們的目標釘選在了共享記憶體上,因為共享記憶體可以輕易的在行程間進行通訊交換,而且不存在深拷貝和網路等問題,效率、效能非常的高,整體微秒級別的響應滿足我們的需求;於是我基於PHP的拓展APCu封裝了適合我們業務場景的外掛程式包進行使用。
webman-shared-cache
我們的基礎套用實作了定時器來從MySQL資料庫讀取配置資訊,定時器的處理器也在讀取數據刷入Redis的同時觸發共享記憶體的更新事件,上層業務透過更新事件的回呼出發會將Redis的數據刷入共享記憶體中,以便當前區服例項的各個行程能夠使用。
我們使用緩存的場景很多都是MAP數據,所以我在實作外掛程式的時候特別實作了類似Redis-Hash相關的功能:
HSet/HGet/HDel/HKeys/HExists
。
由於我們需要一些自增自減的運算,所以也實作了以下功能點:
HIncr/HDecr
支持浮點運算。由於APCu的特性所以儲存的數據也是支持儲存物件數據的;
webman-shared-cache為何使用鎖?
APCu(Alternative PHP Cache User Cache)是一個開放原始碼的PHP緩存擴充套件,它提供了一種在PHP應用程式中儲存和檢索數據的快速方法。它是APC(Alternative PHP Cache)的繼任者,專註於使用者數據的緩存,而不是opcode緩存。
之前我有和社群的同學們聊過,他們不是很理解為什麽我在實作外掛程式的時候自己使用了鎖,這是因為APCu本身的自行實作了對它自身函式的原子性操作,但我們使用它的時候是在多行程的環境下,每一個行程記憶體在多次APCu的操作,為了業務的原子性,我們希望這多次的操作要在一個原子性內完成,所以需要一個鎖來進行隔離,以免在多行程的環境下被其他行程的操作汙染,整體是類似
MySQl
的事務的:
protectedstaticfunction_HIncr(string $key, string|int $hashKey, int|float $hashValue = 1): bool|int|float
{
$func = __FUNCTION__;
$result = false;
$params = func_get_args();
self::_Atomic($key, function()use(
$key, $hashKey, $hashValue, $func, $params, &$result
){
$hash = self::_Get($key, []);
if (is_numeric($v = ($hash[$hashKey] ?? 0))) {
$hash[$hashKey] = $result = $v + $hashValue;
self::_Set($key, $hash);
}
return [
'timestamp' => microtime(true),
'method' => $func,
'params' => $params,
'result' => null
];
}, true);
return $result;
}
比如上述程式碼,就是一個Hash key的自增操作,我們需要在讀取Hash後在寫入,讀取和寫入應為一體的;
原子性執行函式
Atomic
的實作如下:
/**
* 原子操作
* - 無法對鎖本身進行原子性操作
* - 只保證handler是否被原子性觸發,對其邏輯是否丟擲異常不負責
* - handler盡可能避免超長阻塞
* - lockKey會被自動設定特殊字首#lock#,可以透過Cache::LockInfo進行查詢
*
* @param string $lockKey
* @param Closure $handler
* @param bool $blocking
* @return bool
*/
protectedstaticfunction_Atomic(string $lockKey, Closure $handler, bool $blocking = false): bool
{
$func = __FUNCTION__;
$result = false;
if ($blocking) {
$startTime = time();
while ($blocking) {
// 阻塞保險
if (time() >= $startTime + self::$fuse) {returnfalse;}
// 建立鎖
apcu_entry($lock = self::GetLockKey($lockKey), function()use(
$lockKey, $handler, $func, &$result, &$blocking
){
$res = call_user_func($handler);
$result = true;
$blocking = false;
return [
'timestamp' => microtime(true),
'method' => $func,
'params' => [$lockKey, '\Closure'],
'result' => $res
];
});
}
} else {
// 建立鎖
apcu_entry($lock = self::GetLockKey($lockKey), function()use(
$lockKey, $handler, $func, &$result
){
$res = call_user_func($handler);
$result = true;
return [
'timestamp' => microtime(true),
'method' => $func,
'params' => [$lockKey, '\Closure'],
'result' => $res
];
});
}
if ($result) {
apcu_delete($lock);
}
return $result;
}
當使用阻塞模式的時候,我們會在當前行程內使用一個while迴圈來進行阻塞搶占,為了不將當前行程阻塞死,我們還加入了一個保險,由
self::$fuse
提供;
註意
這裏在實踐過程中需要註意的是,Atomic在傳入回呼函式時切勿再使用匿名函式作為參數值或者是透過use傳入一個匿名函式,如:
$fuc = function() {
// do something
}
Cache::Atomic('test', function () use ($fuc) {
// do anything
})
APCu底層會對函式參數值或參照參數進行序列化儲存,但匿名函式不可以被序列化,所以會丟擲一個異常;但你可以透過當前物件的內容值或者靜態內容來保存一個匿名函式,然後在Atomic的回呼內呼叫使用。
0.4.x版本
由於目前我使用Webman基於SQLite和共享記憶體在自行實作一個具備RAFT的輕排程服務外掛程式和服務註冊與發現外掛程式,所以特此為其完善增加了Channel特性;
Channel可以輔助實作類似
Redis-List、Redis-stream、Redis-Pub/Sub
的功能。
Channel
Channel是個特殊的數據格式,他的格式是固定如下的:
[
'--default--' => [
'futureId' => null,
'value' => []
],
workerId_1 => [
'futureId' => 1,
'value' => []
],
workerId_2 => [
'futureId' => 1,
'value' => []
],
......
]
它在共享記憶體中的鍵預設以**#Channel#**開頭。
--default--
是預設儲存空間,
workerId_1/workerId_2
等是子通道儲存空間,命名是由使用者程式碼傳入的,這裏
建議使用workerman內建的workerId
即可。
預設儲存空間和子通道儲存空間是互斥的,也就是說當存在子通道儲存空間時,是不存在--default--的,反之亦然;子通道儲存空間是當當前通道存在監聽器時生成的,而在監聽器產生前,訊息會暫存在--default--空間,當監聽器建立時,--default--的數據value會被同步到子通道儲存空間內, 加入value的隊頭 。
每一個子通道儲存空間的value都是拷貝的,存在相同的數據,各自監聽器監聽各自的子通道儲存空間;訊息的釋出支持向所有子通道釋出,也可以指定子通道進行釋出。
監聽器的底層使用了workerman的定時器,區別與workerman的timer,在event驅動下定時器的間隔是0,也就是一個future,而其他的事件驅動是0.001s為間隔。
實作一個List
由於監聽器建立消費是基於workerId的,我們可以透過不同行程建立相同的workerId的監聽器來對同一個子通道進行監聽:
A行程使用list作為workerId:
Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的業務邏輯
});
B行程也同樣建立list的workerId監聽器:
Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的業務邏輯
});
此時Channel test的數據如下:
[
'list' => [
'futureId' => 1,
'value' => []
],
......
]
註意
:共享記憶體中儲存的futureId為最後一個監聽器建立的futureId;當當前行程需要對監聽器進行移除時,請勿使用該數據,對應行程內可以透過
Cache::ChCreateListener()
的返回值獲取到當前行程建立的futureId用於移除監聽器,不使用共享記憶體中儲存的futureId即可
這時任意行程透過
Cache::ChPublish('test', '這是一個測試訊息', true);
發送訊息,或者指定workerIdCache::ChPublish('test', '這是一個測試訊息', true, 'list');
。
實作一個
Pub/Sub
A行程使用workerman的workerId作為workerId:
Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的業務邏輯
});
B行程使用workerman的workerId作為workerId:
Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的業務邏輯
});
此時Channel test的數據可能如下:
[
1 => [
'futureId' => 1,
'value' => []
],
2 => [
'futureId' => 1,
'value' => []
]
]
這時,任意行程透過
Cache::ChPublish('test', '這是一個測試訊息', false);
發送訊息即可。
註 :發送訊息第三個參數使用false時,如發送時還未建立監聽器,訊息則不會儲存至Channel,即監聽後才可存在訊息
實作類似
Redis-stream
與Pub/Sub相同,只不過釋出訊息使用
Cache::ChPublish('test', '這是一個測試訊息', true);
, 當釋出訊息指定workerId時,可以實作類似Redis-Stream Group的功能。
註 :這裏更復雜的功能可能需要對workerId進行變通,不能簡單使用workerman內建的workerId,只需要自行規劃好即可