當前位置: 妍妍網 > 碼農

webman使用RabbitMQ訊息中介軟體實作系統異步解耦實戰教程

2024-06-09碼農

unset unset 簡介 unset unset

RabbitMQ是一個開源的訊息代理軟體,它使用高級訊息佇列協定(AMQP)來實作訊息的發送和接收。RabbitMQ支持多種訊息協定,包括STOMP、MQTT等,並且能夠與多種程式語言和平台整合,如Java、.NET、Python等。

AMQP 即 Advanced Message Queuing Protocol (高級訊息佇列協定),是一個網路協定,是套用層協定的一個開放標準,為面向訊息的中介軟體設計。基於此協定的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制。

基本架構設計

圖片來源:https://blog.csdn.net/cuierdan/article/details/123824300

基本概念

  • Message Broker :(訊息代理伺服器)是一個虛擬的概念,而RabbitMQ是Message Broker的一個例項。

  • Producer :(生產者)產生數據並將數據發送到訊息代理伺服器(Message Broker)的程式被稱作訊息的生產者。

  • Connection :(連線)生產者與消費者透過TCP協定與訊息代理伺服器(Message Broker)建立的連線。

  • Channel :(通道)建立在Connection中的虛擬連線,類似於連線資料庫時的連線池的概念,生產者和消費者並不是直接與MQ透過Connection進行通訊的,而是透過Channel進行連線通訊的,數據的流動是在Channel中進行的。

  • VirtualHost :(虛擬訊息伺服器)就像mysql資料庫中有資料庫例項的概念,並且可以指定使用者對庫和表等操作的設定許可權。也可以類別成LINUX系統中的不同使用者,不同使用者之間是相互獨立的。每個VirtualHost相當於一個相對獨立mini的RabbitMQ伺服器。每個VirtutalHost之間是相互隔離的,exchange,queue,message等不能互通。

  • Exchange :(交換機)交換機直接與Channel(通道)連線,接收來自於訊息生產者產生的數據,在由Exchange將訊息路由到一個或多個Queue中(或者丟棄)。Exchange並不儲存訊息。RabbitMQ的交換機有fanout(扇出),direct(直接),topic(主題),headers(標題)四種型別,每種交換機型別都對應著不同的路由規則,根據不同的路由規則,交換機會將訊息路由到不同的佇列中。

  • Binding : (繫結)交換機與佇列之間的虛擬連線,在這個繫結中可以設定Binding Key,一個繫結就是用一個Binding Key將交換器和佇列連線起來,設定的Binding Key存在著一定的規則,Exchange會將訊息中攜帶的Routing Key與Binding Key 中設定的規則進行匹配,將訊息發送到相應的佇列中。Binding資訊被保存到Exchange中的查詢表中,用於Exchange將訊息分發到佇列的依據。

  • Routing Key :(路由鍵)用於匹配路由規則的依據,生產者在將訊息發送到Exchange時,一般會指定一個Routing Key,交換機會根據Routing Key 來匹配Binding中設定的路由規則,將符合規則的訊息發送到指定的佇列中。

  • Queue :(訊息佇列)RabbitMQ中的內部物件用於存放訊息的容器,RabbitMQ會將訊息按照RabbitMQ的六大模式中的一種將佇列中的訊息發送給消費者,RabbitMQ會根據選擇模式的不同將佇列中的訊息發送給一個或多個消費者,在連線到消費者之前,訊息一直在等待消費者到佇列中將訊息取走。

  • Consumer :(消費者)訊息的消費者,表示一個從佇列中取訊息的應用程式。

  • 特點

    1. 可靠性 :RabbitMQ使用一些機制來保證可靠性, 如持久化、傳輸確認及釋出確認等。

    2. 靈活的路由 :在訊息進入佇列之前,透過交換器來路由訊息。

    3. 擴充套件性 :多個RabbitMQ節點可以組成一個集群,也可以根據實際業務情況動態地擴充套件 集群中節點。

    4. 高可用性 :佇列可以在集群中的機器上設定映像,使得在部份節點出現問題的情況下隊 列仍然可用。

    5. 多種協定 :RabbitMQ除了原生支持AMQP協定,還支持STOMP, MQTT等多種訊息 中介軟體協定。

    6. 支持多語言客戶端: RabbitMQ 幾乎支持所有常用語言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。

    主要功能

    1. 訊息佇列 :允許應用程式將訊息發送到佇列中,然後由另一個應用程式從佇列中取出並處理。

    2. 訊息路由 :支持將訊息從發送者路由到一個或多個接收者。

    3. 訊息持久化 :確保訊息在系統故障後不會遺失。

    4. 訊息確認 :確保訊息被正確處理,如果處理失敗,可以重新發送。

    5. 集群 :支持在多個節點上執行,以提供高可用性和負載均衡。

    unset unset 安裝 unset unset

    這裏使用1Panel安裝,1Panel 是一個現代化、開源的 Linux 伺服器運維管理面板

    市集

    安裝映像

    註:這裏需要勾選【埠外部存取】,方便本地偵錯

    映像

    安裝成功後,就可以在映像中找到已安裝好的RabbitMQ映像容器

    外網可存取地址: http://{{你的公網ip}}:15672 ,如果是在雲伺服器,記得安全策略放開埠 15672

  • RabbitMQ管理界面埠 15672 。是一個Web應用程式,用於管理和監控RabbitMQ訊息代理

  • AMQP預設埠 5672 。是一種網路協定,用於在應用程式之間傳遞訊息,通常用於訊息佇列系統。

  • 控制台

    登陸控制台,帳號和密碼都是 rabbitmq

    unset unset 使用 unset unset

    這裏使用webman外掛程式RabbitMQ客戶端,外掛程式地址: https://www.workerman.net/plugin/67

    非常感謝兔子大佬的外掛程式貢獻 !🌻

    非常感謝兔子大佬的外掛程式貢獻 !🌻

    非常感謝兔子大佬的外掛程式貢獻 !🌻

    外掛程式特點

  • 支持5種消費模式:簡單佇列、workQueue、routing、pub/sub、exchange;

  • 支持延遲佇列( RabbitMQ 須安裝外掛程式);

  • 異步無阻塞消費、異步無阻塞生產、同步阻塞生產

  • 安裝外掛程式

    透過composer包管理安裝

    composer require workbunny/webman-rabbitmq

    註:安裝該外掛程式前,請確保你已經安裝好webman框架,相關安裝文件: https://www.workerman.net/doc/webman/install.html

    配置

    外掛程式所有配置檔路徑: config/plugin/workbunny/webman-rabbitmq/app.php

    <?php
    return [
    'enable' => true,
    'host' => '120.120.120.74',
    'vhost' => '/',
    'port' => 5672,
    'username' => 'rabbitmq',
    'password' => 'rabbitmq',
    'mechanisms' => 'AMQPLAIN',
    ...
    ];

  • host 修改為伺服器公網ip

  • port 修改為15672

  • 消費者

    這裏使用命令建立一個擁有單行程消費者的 RestyBuilder

    ./webman workbunny:rabbitmq-builder resty --mode=queue
    ℹ️ Config updated.
    ℹ️ Builder created.
    ✅ Builder RestyBuilder created successfully.

    建立完成後完整的消費者檔位置 process/workbunny/rabbitmq/RestyBuilder.php

    <?phpdeclare(strict_types=1);
    namespaceprocess\workbunny\rabbitmq;
    useBunny\ChannelasBunnyChannel;
    useBunny\Async\ClientasBunnyClient;
    useBunny\MessageasBunnyMessage;
    useWorkbunny\WebmanRabbitMQ\Constants;
    useWorkbunny\WebmanRabbitMQ\Builders\QueueBuilder;
    classRestyBuilderextendsQueueBuilder
    {
    /**
    @var array = [
    * 'name' => 'example',
    * 'delayed' => false,
    * 'prefetch_count' => 1,
    * 'prefetch_size' => 0,
    * 'is_global' => false,
    * 'routing_key' => '',
    * ]
    */

    protectedarray $queueConfig = [
    // 佇列名稱 ,預設由類名自動生成
    'name' => 'process.workbunny.rabbitmq.RestyBuilder',
    // 是否延遲
    'delayed' => false,
    // QOS 數量
    'prefetch_count' => 0,
    // QOS size 
    'prefetch_size' => 0,
    // QOS 全域
    'is_global' => false,
    // 路由鍵
    'routing_key' => '',
    ];
    /** @var string 交換機型別 */
    protected string $exchangeType = Constants::DIRECT;
    /** @var string|null 交換機名稱,預設由類名自動生成 */
    protected ?string $exchangeName = 'process.workbunny.rabbitmq.RestyBuilder';
    /** @inheritDoc */
    publicfunctionhandler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client)string
    {
    echo'[RabbitMQ][佇列消費] Tag:' .$message->consumerTag. PHP_EOL;
    echo'[RabbitMQ][佇列消費著] Content:' .$message->content. PHP_EOL;
    echo'[RabbitMQ][佇列消費著] Exchange:' .$message->exchange. PHP_EOL;
    return Constants::ACK;
    }
    }




    啟動webman

    php start.php start
    Workerman[start.php] start in DEBUG mode
    ----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------
    Workerman version:4.1.15 PHP version:8.2.18 Event-Loop:\Workerman\Events\Event
    ------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------
    proto user worker listen processes status
    tcp root webman http://0.0.0.0:8217 2 [OK]
    tcp root monitor none 1 [OK]
    tcp root plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder none 1 [OK]
    ---------------------------------------------------------------------------------------------------------------------------------------------------------

    生產者

    use functionWorkbunny\WebmanRabbitMQ\sync_publish;
    use process\workbunny\rabbitmq\RestyBuilder;
    sync_publish(RestyBuilder::instance(), '兔子大佬你好呀!'); # return bool

    消費結果

    [RabbitMQ][佇列消費] Tag:process.workbunny.rabbitmq.RestyBuilder
    [RabbitMQ][佇列消費著] Content:開源技術小棧你好呀!
    [RabbitMQ][佇列消費著] Exchange:process.workbunny.rabbitmq.RestyBuilder
    ...
    [RabbitMQ][佇列消費] Tag:process.workbunny.rabbitmq.RestyBuilder
    [RabbitMQ][佇列消費著] Content:兔子大佬你好呀!
    [RabbitMQ][佇列消費著] Exchange:process.workbunny.rabbitmq.RestyBuilder

    unset unset RabbitMQ管理界面 unset unset

    透過RabbitMQ管理界面端發送訊息

    消費者消費情況