概述
分享主題:使用workerman實作基於UDP的異步SIP伺服器,伺服器端可主動發送UDP數據給客戶端
基於Workerman實作基於UDP的異步SIP伺服器是一個涉及網路編程和協定實作的復雜任務。Workerman是一個高效能的PHP socket伺服器框架,它支持TCP、UDP、UnixSocket等多種協定,非常適合用於開發需要長連線或高並行的網路套用。下面將詳細介紹如何使用Workerman來實作一個基於UDP的異步SIP伺服器。
理解SIP協定
SIP(Session Initiation Protocol)是一個信令協定,用於在Internet Protocol(IP)網路中啟動、管理和終止即時會話,這些會話可能包括語音、視訊、訊息傳遞以及其他多媒體通訊。SIP協定是基於請求/響應模型的,類似於HTTP協定。
業務需求
自從使用workerman實作物聯網終端接入以來,我工作中的所有網路場景(
TCP\UDP\HTTP
)等均使用
workerman+channel
以微服務方式實作,開發速度快,效能超級高。(幾十萬台裝置同時接入都輕輕松松承受住)
之前多次關註過workerman的UDP伺服器,但一沒有實作我想要的結果,由於近期的業務需求,外加HTTP3 QUIC協定的廣泛使用,workerman作為一個廣泛使用的高效能PHP網路開發框架,支持持久化的UDP通訊是很有必要的。
一直以來想透過workerman編寫個基於UDP的SIP伺服器和實作
GB28181
的國標協定,搭配
SRS
、
ZLMediaKit
或者
monibuca
,滿足網路攝影機、硬碟錄像機裝置的接入,也可配合FreeSwitch實作基於SIP的語音通話或視訊會議系統。
workerman 主動發送udp數據:https://www.workerman.net/q/2688
UDP伺服器主動向客戶端發送訊息:https://www.workerman.net/q/4284
直到今天終於使用workerman 實作單行程或多行程方式監聽某個UDP埠,主動從平台向客戶端發送數據 並且所有功能均使用workerman的loop功能,能夠發揮平台最大化效能。
當行程只有一個時使用
socket
函式實作埠監聽,當行程大於1個時使用
stream_socket
實作埠監聽(各有利弊,請酌情使用,大部份場景,推薦將行程數保持與CPU數量一致,自動使用 stream_socket )
0x02 初步測試
當使用stream_socket時,伺服器首次收到客戶端發送的數據後,能夠穩定的向客戶端發送約5分鐘的資料包,直到該通訊會話被Linux內核丟棄,因此使用UDP進行通訊,建議至少60秒進行一次雙向心跳互動保活。
當使用socket時,伺服器首次收到客戶端發送的數據後,能夠穩定的向客戶端長期發送資料包(如果網路中的防火墻或NAT路由器沒有將會話過期,應該可以一直使用)
0x03 程式碼
<?php
chdir(dirname($_SERVER['SCRIPT_FILENAME']));
include_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Lib\Timer;
$worker = new Worker();
$worker->count = 1; //開啟行程數量
$processName = "sip_server_udp5060";
$worker->transport = "udp";
$worker->name = $processName;
$worker->reusePort = true; //開啟均衡負載模式
$date = date("Y-m-d");
Worker::$pidFile = "var/{$processName}.pid";
Worker::$logFile = "var/{$processName}_logFile.log";
Worker::$stdoutFile = "var/{$processName}_stdout.log";
$socket = null;
$workerId = null;
define("UNAUTHORIZED_KICKOFF_SECOND" , 1);
define("UNAUTHORIZED_ALLOW_TRYTIME" , 10);
define("UDP_SOCKET_TYPE_IS_STREAM" , $worker->count > 1 ? 1 : 0 );
$worker->onWorkerStart = function() {
global $worker , $workerId , $socket , $processName;
//定期與資料庫握手,避免被斷掉,該動作每個行程都得執行
$workerId = $worker->id ;
$worker->connections = [];
$worker->connections_ur = [];
//根據daemon順序延時,這是確保系統正常執行的關鍵
usleep(1000 * 10 * ($worker->id+1) );
echo date("Y-m-d H:i:s")." 服務行程{$worker->id}已經啟動!\n";
if( $workerId >= 0 ){
//計劃監聽的UDP埠
if(UDP_SOCKET_TYPE_IS_STREAM == 0){
//這種模式只能執行一個行程
$socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
socket_bind($socket , "0.0.0.0" , 5060);
}else{
//這種模式可以多個行程共同監聽同一埠
$context = stream_context_create();
$socket = stream_socket_server("udp://0.0.0.0:5060" , $error_code , $error_message , STREAM_SERVER_BIND , $context);
}
Worker::$globalEvent->add($socket, 1 , "acceptUdpConnection"); //加入全域事件loop
/*
//如果需要從伺服器主動發數據給客戶端,可以透過channel方式呼叫,具體的業務實作可以參照官方channel使用介紹
Channel\Client::connect(CHANNEL_SIP_IP , CHANNEL_SIP_PORT);
$event_name = $processName; //UDP的數據回復跟TCP不一樣,只要知道對方埠即可,理論上任意一個行程均可從伺服器端發送數據給客戶端
//到公網環境查詢本機公網IP,便於生成日誌
Channel\Client::on($event_name, function($event_data)use($worker , $event_name) {
});
*/
//每隔一段時間對長時間未進行通訊的臨時會話進行清理
//不宜太大,太大容易遭受DDOS攻擊,也不宜太小,太小的話有可能還沒完成業務邏輯就被踢掉
Timer::add( UNAUTHORIZED_KICKOFF_SECOND , function (){
global $worker , $socket;
$dateTime = date("Y-m-d H:i:s");
$timeNow = time();
foreach ($worker->connections_ur as $remote_address => $remote_arr){
if( $remote_arr['lastMsgTime'] < $timeNow - UNAUTHORIZED_KICKOFF_SECOND ){
//非法接入,直接踢掉
unset($worker->connections_ur[$remote_address]);
//print("{$dateTime} {$remote_address} 踢掉非法接入\n");
//sendto($remote_address ,"illegal connection!\n");
}
//平台主動發送數據給終端,
sendto($remote_address ,"{$dateTime} test send data!\n");
}
});
//每隔一段時間對已經認證過的會話進行檢查,對於長時間未通訊的需要進行清理
Timer::add( 60 , function (){
global $worker , $socket;
$dateTime = date("Y-m-d H:i:s");
$timeNow = time();
foreach ($worker->connections as $remote_address => $remote_arr){
if( $remote_arr['lastMsgTime'] < $timeNow - 300 ){
//超時未通訊,直接踢掉
unset($worker->connections[$remote_address]);
//這裏編寫遠端會話離線的內容
//print("{$dateTime} {$remote_address} 會話超時掉線\n");
//sendto($remote_address ,"connection timeout!\n");
}
}
});
}
};
function sendto($remote_address , $send_buffer ){
global $socket;
if(UDP_SOCKET_TYPE_IS_STREAM == 0){
list($host, $port) = explode(":" , $remote_address );
socket_sendto($socket , $send_buffer , strlen($send_buffer), 0, $host, $port);
}else{
stream_socket_sendto( $socket, $send_buffer, 0, $remote_address);
}
}
function acceptUdpConnection($socket){
global $worker;
$dateTime = date("Y-m-d H:i:s");
$timeNow = time();
set_error_handler(function(){});
if(UDP_SOCKET_TYPE_IS_STREAM == 0){
socket_recvfrom($socket, $recv_buffer, 65535, 0, $from, $port);
$remote_address = "{$from}:{$port}";
}else{
$recv_buffer = stream_socket_recvfrom($socket, 65535 , 0, $remote_address);
}
restore_error_handler();
if (false === $recv_buffer || empty($remote_address)) {
returnfalse;
}
if(
isset($worker->connections[$remote_address]['lastMsgTime'])
&& $worker->connections[$remote_address]['lastMsgTime'] >= $timeNow - 300
){
//已註冊成功
$worker->connections[$remote_address]['lastMsgTime'] = $timeNow;
//處理業務邏輯 針對已經認證的合法連線
//針對已經認證過的連線,建議將收到的數據透過channel釋出到其他伺服端進行輪詢處理,以最大化提升系統處理效能,此時,本程式僅僅充當gateway功能
}else{
//未註冊成功 未認證的可以進行幾次通訊
$tryCount = $worker->connections_ur[$remote_address]['tryCount']??1;
if( $tryCount < UNAUTHORIZED_ALLOW_TRYTIME ){
$worker->connections_ur[$remote_address]['lastMsgTime'] = $timeNow;
$worker->connections_ur[$remote_address]['tryCount'] = $tryCount + 1;
//sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,try time {$tryCount} \n");
//處理業務邏輯 對於未認證的連線,必須在超時前完成認證 完成認證後需要將連線從 connections_ur 裏面移出,並且加入到 connections 裏面去
}else{
//超過一定互動次數但仍未完成認證的會話將被忽視,直到被踢下線
//sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,max try \n");
}
}
//收到數據,打印日誌
print( "{$dateTime} {$remote_address}:\n{$recv_buffer}\n");
};
Worker::runAll();
0x04 程式輸出樣例數據
2024-03-2701:15:5239.129.72.95:50574:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.4:5060;rport;branch=z9hG4bK650883237
From: <sip:34020000001180870005@3402000000>;tag=698431213
To: <sip:34020000001180870005@3402000000>
Call-ID: 1146222786
CSeq: 1 REGISTER
Contact: <sip:34020000001180870005@192.168.1.4:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
2024-03-2701:15:5339.129.72.95:15363:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.2:5060;rport;branch=z9hG4bK1890963109
From: <sip:34020000001180870007@3402000000>;tag=1955740825
To: <sip:34020000001180870007@3402000000>
Call-ID: 254089720
CSeq: 1 REGISTER
Contact: <sip:34020000001180870007@192.168.1.2:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
2024-03-2701:15:5439.129.72.95:31137:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.8:5060;rport;branch=z9hG4bK1552460892
From: <sip:34020000001180870002@3402000000>;tag=1271059450
To: <sip:34020000001180870002@3402000000>
Call-ID: 108704431
CSeq: 1 REGISTER
Contact: <sip:34020000001180870002@192.168.1.8:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
01:38:09.293144 IP (tos 0x68, ttl 48, id 44466, offset 0, flags [DF], proto UDP (17), length 436)
39.129.72.95.64959 > 192.168.27.21.5060: [udp sum ok] SIP, length: 408
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.7:5060;rport;branch=z9hG4bK1838031091
From: <sip:34020000001180870003@3402000000>;tag=793428652
To: <sip:34020000001180870003@3402000000>
Call-ID: 1815643450
CSeq: 1 REGISTER
Contact: <sip:34020000001180870003@192.168.1.7:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
01:38:09.540771 IP (tos 0x0, ttl 64, id 30488, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.50574: [udp sum ok] SIP
01:38:09.540806 IP (tos 0x0, ttl 64, id 30489, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.31137: [udp sum ok] SIP
01:38:09.540815 IP (tos 0x0, ttl 64, id 30490, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.15363: [udp sum ok] SIP
01:38:09.540822 IP (tos 0x0, ttl 64, id 45413, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.130.87.71.34950: [udp sum ok] SIP
01:38:09.540830 IP (tos 0x0, ttl 64, id 27774, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.128.225.81.35329: [udp sum ok] SIP
01:38:09.540838 IP (tos 0x0, ttl 64, id 30491, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.26505: [udp sum ok] SIP
01:38:09.540845 IP (tos 0x0, ttl 64, id 2845, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.64959: [udp sum ok] SIP
01:38:09.540860 IP (tos 0x0, ttl 64, id 30493, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.36321: [udp sum ok] SIP
01:38:09.540867 IP (tos 0x0, ttl 64, id 27775, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.128.225.81.35342: [udp sum ok] SIP
01:38:09.540873 IP (tos 0x0, ttl 64, id 30494, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.23047: [udp sum ok] SIP
* */