當前位置: 妍妍網 > 碼農

使用Workerman實作基於UDP的異步SIP伺服器

2024-03-30碼農

概述

分享主題:使用workerman實作基於UDP的異步SIP伺服器,伺服器端可主動發送UDP數據給客戶端

基於Workerman實作基於UDP的異步SIP伺服器是一個涉及網路編程和協定實作的復雜任務。Workerman是一個高效能的PHP socket伺服器框架,它支持TCP、UDP、UnixSocket等多種協定,非常適合用於開發需要長連線或高並行的網路套用。下面將詳細介紹如何使用Workerman來實作一個基於UDP的異步SIP伺服器。

理解SIP協定

SIP(Session Initiation Protocol)是一個信令協定,用於在Internet Protocol(IP)網路中啟動、管理和終止即時會話,這些會話可能包括語音、視訊、訊息傳遞以及其他多媒體通訊。SIP協定是基於請求/響應模型的,類似於HTTP協定。

業務需求

自從使用workerman實作物聯網終端接入以來,我工作中的所有網路場景( TCP\UDP\HTTP )等均使用 workerman+channel 以微服務方式實作,開發速度快,效能超級高。(幾十萬台裝置同時接入都輕輕松松承受住)

之前多次關註過workerman的UDP伺服器,但一沒有實作我想要的結果,由於近期的業務需求,外加HTTP3 QUIC協定的廣泛使用,workerman作為一個廣泛使用的高效能PHP網路開發框架,支持持久化的UDP通訊是很有必要的。

一直以來想透過workerman編寫個基於UDP的SIP伺服器和實作 GB28181 的國標協定,搭配 SRS ZLMediaKit 或者 monibuca ,滿足網路攝影機、硬碟錄像機裝置的接入,也可配合FreeSwitch實作基於SIP的語音通話或視訊會議系統。

  • workerman 主動發送udp數據:https://www.workerman.net/q/2688

  • UDP伺服器主動向客戶端發送訊息:https://www.workerman.net/q/4284

  • 直到今天終於使用workerman 實作單行程或多行程方式監聽某個UDP埠,主動從平台向客戶端發送數據 並且所有功能均使用workerman的loop功能,能夠發揮平台最大化效能。

    當行程只有一個時使用 socket 函式實作埠監聽,當行程大於1個時使用 stream_socket 實作埠監聽(各有利弊,請酌情使用,大部份場景,推薦將行程數保持與CPU數量一致,自動使用 stream_socket )

    0x02 初步測試

    當使用stream_socket時,伺服器首次收到客戶端發送的數據後,能夠穩定的向客戶端發送約5分鐘的資料包,直到該通訊會話被Linux內核丟棄,因此使用UDP進行通訊,建議至少60秒進行一次雙向心跳互動保活。

    當使用socket時,伺服器首次收到客戶端發送的數據後,能夠穩定的向客戶端長期發送資料包(如果網路中的防火墻或NAT路由器沒有將會話過期,應該可以一直使用)

    0x03 程式碼

    <?php
    chdir(dirname($_SERVER['SCRIPT_FILENAME']));
    include_once __DIR__ . '/vendor/autoload.php';
    use Workerman\Worker;
    use Workerman\Lib\Timer;
    $worker = new Worker();
    $worker->count = 1; //開啟行程數量
    $processName = "sip_server_udp5060";
    $worker->transport = "udp";
    $worker->name = $processName;
    $worker->reusePort = true; //開啟均衡負載模式
    $date = date("Y-m-d");
    Worker::$pidFile = "var/{$processName}.pid";
    Worker::$logFile = "var/{$processName}_logFile.log";
    Worker::$stdoutFile = "var/{$processName}_stdout.log";
    $socket = null;
    $workerId = null;
    define("UNAUTHORIZED_KICKOFF_SECOND" , 1);
    define("UNAUTHORIZED_ALLOW_TRYTIME" , 10);
    define("UDP_SOCKET_TYPE_IS_STREAM" , $worker->count > 1 ? 1 : 0 );
    $worker->onWorkerStart = function() {
    global $worker , $workerId , $socket , $processName;
    //定期與資料庫握手,避免被斷掉,該動作每個行程都得執行
    $workerId = $worker->id ;
    $worker->connections = [];
    $worker->connections_ur = [];
    //根據daemon順序延時,這是確保系統正常執行的關鍵
    usleep(1000 * 10 * ($worker->id+1) );
    echo date("Y-m-d H:i:s")." 服務行程{$worker->id}已經啟動!\n";
    if$workerId >= 0 ){
    //計劃監聽的UDP埠
    if(UDP_SOCKET_TYPE_IS_STREAM == 0){
    //這種模式只能執行一個行程
    $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
    socket_bind($socket , "0.0.0.0" , 5060);
    }else{
    //這種模式可以多個行程共同監聽同一埠
    $context = stream_context_create();
    $socket = stream_socket_server("udp://0.0.0.0:5060" , $error_code , $error_message , STREAM_SERVER_BIND , $context);
    }
    Worker::$globalEvent->add($socket, 1 , "acceptUdpConnection"); //加入全域事件loop
    /*
    //如果需要從伺服器主動發數據給客戶端,可以透過channel方式呼叫,具體的業務實作可以參照官方channel使用介紹
    Channel\Client::connect(CHANNEL_SIP_IP , CHANNEL_SIP_PORT);
    $event_name = $processName; //UDP的數據回復跟TCP不一樣,只要知道對方埠即可,理論上任意一個行程均可從伺服器端發送數據給客戶端
    //到公網環境查詢本機公網IP,便於生成日誌
    Channel\Client::on($event_namefunction($event_data)use($worker , $event_name) {
    });
    */
    //每隔一段時間對長時間未進行通訊的臨時會話進行清理
    //不宜太大,太大容易遭受DDOS攻擊,也不宜太小,太小的話有可能還沒完成業務邏輯就被踢掉
    Timer::add( UNAUTHORIZED_KICKOFF_SECOND , function (){
    global $worker , $socket;
    $dateTime = date("Y-m-d H:i:s");
    $timeNow = time();
    foreach ($worker->connections_ur as $remote_address => $remote_arr){
    if$remote_arr['lastMsgTime'] < $timeNow - UNAUTHORIZED_KICKOFF_SECOND ){
    //非法接入,直接踢掉
    unset($worker->connections_ur[$remote_address]);
    //print("{$dateTime} {$remote_address} 踢掉非法接入\n");
    //sendto($remote_address ,"illegal connection!\n");
    }
    //平台主動發送數據給終端,
    sendto($remote_address ,"{$dateTime} test send data!\n");
    }
    });
    //每隔一段時間對已經認證過的會話進行檢查,對於長時間未通訊的需要進行清理
    Timer::add( 60 , function (){
    global $worker , $socket;
    $dateTime = date("Y-m-d H:i:s");
    $timeNow = time();
    foreach ($worker->connections as $remote_address => $remote_arr){
    if$remote_arr['lastMsgTime'] < $timeNow - 300 ){
    //超時未通訊,直接踢掉
    unset($worker->connections[$remote_address]);
    //這裏編寫遠端會話離線的內容
    //print("{$dateTime} {$remote_address} 會話超時掉線\n");
    //sendto($remote_address ,"connection timeout!\n");
    }
    }
    });
    }
    };
    function sendto($remote_address , $send_buffer ){
    global $socket;
    if(UDP_SOCKET_TYPE_IS_STREAM == 0){
    list($host$port) = explode(":" , $remote_address );
    socket_sendto($socket , $send_buffer , strlen($send_buffer), 0, $host$port);
    }else{
    stream_socket_sendto( $socket$send_buffer, 0, $remote_address);
    }
    }
    function acceptUdpConnection($socket){
    global $worker;
    $dateTime = date("Y-m-d H:i:s");
    $timeNow = time();
    set_error_handler(function(){});
    if(UDP_SOCKET_TYPE_IS_STREAM == 0){
    socket_recvfrom($socket$recv_buffer, 65535, 0, $from$port);
    $remote_address = "{$from}:{$port}";
    }else{
    $recv_buffer = stream_socket_recvfrom($socket, 65535 , 0, $remote_address);
    }
    restore_error_handler();
    if (false === $recv_buffer || empty($remote_address)) {
    returnfalse;
    }
    if(
    isset($worker->connections[$remote_address]['lastMsgTime'])
    && $worker->connections[$remote_address]['lastMsgTime'] >= $timeNow - 300
    ){
    //已註冊成功
    $worker->connections[$remote_address]['lastMsgTime'] = $timeNow;
    //處理業務邏輯 針對已經認證的合法連線
    //針對已經認證過的連線,建議將收到的數據透過channel釋出到其他伺服端進行輪詢處理,以最大化提升系統處理效能,此時,本程式僅僅充當gateway功能
    }else{
    //未註冊成功 未認證的可以進行幾次通訊
    $tryCount = $worker->connections_ur[$remote_address]['tryCount']??1;
    if$tryCount < UNAUTHORIZED_ALLOW_TRYTIME ){
    $worker->connections_ur[$remote_address]['lastMsgTime'] = $timeNow;
    $worker->connections_ur[$remote_address]['tryCount'] = $tryCount + 1;
    //sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,try time {$tryCount} \n");
    //處理業務邏輯 對於未認證的連線,必須在超時前完成認證 完成認證後需要將連線從 connections_ur 裏面移出,並且加入到 connections 裏面去
    }else{
    //超過一定互動次數但仍未完成認證的會話將被忽視,直到被踢下線
    //sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,max try \n");
    }
    }
    //收到數據,打印日誌
    print"{$dateTime} {$remote_address}:\n{$recv_buffer}\n");
    };
    Worker::runAll();



























    0x04 程式輸出樣例數據

    2024-03-2701:15:5239.129.72.95:50574:
    REGISTER sip:34020000002000000001@3402000000 SIP/2.0
    Via: SIP/2.0/UDP 192.168.1.4:5060;rport;branch=z9hG4bK650883237
    From: <sip:34020000001180870005@3402000000>;tag=698431213
    To: <sip:34020000001180870005@3402000000>
    Call-ID: 1146222786
    CSeq: 1 REGISTER
    Contact: <sip:34020000001180870005@192.168.1.4:5060>
    Max-Forwards: 70
    User-Agent: Embedded Net DVR/NVR/DVS
    Expires: 86400
    Content-Length: 0
    2024-03-2701:15:5339.129.72.95:15363:
    REGISTER sip:34020000002000000001@3402000000 SIP/2.0
    Via: SIP/2.0/UDP 192.168.1.2:5060;rport;branch=z9hG4bK1890963109
    From: <sip:34020000001180870007@3402000000>;tag=1955740825
    To: <sip:34020000001180870007@3402000000>
    Call-ID: 254089720
    CSeq: 1 REGISTER
    Contact: <sip:34020000001180870007@192.168.1.2:5060>
    Max-Forwards: 70
    User-Agent: Embedded Net DVR/NVR/DVS
    Expires: 86400
    Content-Length: 0
    2024-03-2701:15:5439.129.72.95:31137:
    REGISTER sip:34020000002000000001@3402000000 SIP/2.0
    Via: SIP/2.0/UDP 192.168.1.8:5060;rport;branch=z9hG4bK1552460892
    From: <sip:34020000001180870002@3402000000>;tag=1271059450
    To: <sip:34020000001180870002@3402000000>
    Call-ID: 108704431
    CSeq: 1 REGISTER
    Contact: <sip:34020000001180870002@192.168.1.8:5060>
    Max-Forwards: 70
    User-Agent: Embedded Net DVR/NVR/DVS
    Expires: 86400
    Content-Length: 0
    01:38:09.293144 IP (tos 0x68, ttl 48, id 44466, offset 0, flags [DF], proto UDP (17), length 436)
    39.129.72.95.64959 > 192.168.27.21.5060: [udp sum ok] SIP, length: 408
    REGISTER sip:34020000002000000001@3402000000 SIP/2.0
    Via: SIP/2.0/UDP 192.168.1.7:5060;rport;branch=z9hG4bK1838031091
    From: <sip:34020000001180870003@3402000000>;tag=793428652
    To: <sip:34020000001180870003@3402000000>
    Call-ID: 1815643450
    CSeq: 1 REGISTER
    Contact: <sip:34020000001180870003@192.168.1.7:5060>
    Max-Forwards: 70
    User-Agent: Embedded Net DVR/NVR/DVS
    Expires: 86400
    Content-Length: 0
    01:38:09.540771 IP (tos 0x0, ttl 64, id 30488, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.50574: [udp sum ok] SIP
    01:38:09.540806 IP (tos 0x0, ttl 64, id 30489, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.31137: [udp sum ok] SIP
    01:38:09.540815 IP (tos 0x0, ttl 64, id 30490, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.15363: [udp sum ok] SIP
    01:38:09.540822 IP (tos 0x0, ttl 64, id 45413, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.130.87.71.34950: [udp sum ok] SIP
    01:38:09.540830 IP (tos 0x0, ttl 64, id 27774, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.128.225.81.35329: [udp sum ok] SIP
    01:38:09.540838 IP (tos 0x0, ttl 64, id 30491, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.26505: [udp sum ok] SIP
    01:38:09.540845 IP (tos 0x0, ttl 64, id 2845, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.64959: [udp sum ok] SIP
    01:38:09.540860 IP (tos 0x0, ttl 64, id 30493, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.36321: [udp sum ok] SIP
    01:38:09.540867 IP (tos 0x0, ttl 64, id 27775, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.128.225.81.35342: [udp sum ok] SIP
    01:38:09.540873 IP (tos 0x0, ttl 64, id 30494, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.23047: [udp sum ok] SIP
     * */