當前位置: 妍妍網 > 碼農

MQTT協定程式碼實作詳解一

2024-05-11碼農

為了打發空閑時間會對之前寫的MQTT協定的程式碼實作進行詳細講解;主要是講解的內容是對MQTT網路通訊協定的分析和實作,具體涉及對網路數據流拆分處理,因此很多內容都涉及到具體的程式碼功能,如果你有興趣學習這方面的那應該是有一定幫助的。接來就講解BeetleX.MQTT.Protocols的設計和具體實作細節(對於MQTT協定是5.0版本)。
參考文件 中文:vitsumoc.github.io/ mqtt-v5-0-chinese.ht ml
英文: d ocs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
程式語言: C#
完整程式碼: //github.com/beetlex-io/mqtt

什麽是MQTT
M QTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協定),是一種基於釋出/訂閱(publish/subscribe)模式的「輕量級」通訊協定,該協定構建於TCP/IP協定上,由IBM在1999年釋出(最新版本協定於2019年定稿的5.0)。

基礎型別
通訊協定是對數據流規範的定義,根據具體規範進行數據拆分和讀取。每個被拆分成一個完整訊息的可以稱作為訊息包,而訊息包則有多個成員變數元件,而這些成員變數則是協定的基礎型別。而在MQTT協定操作過程中涉及的型別如下:

  • 位元
    絡和儲存的基礎單位是字節,但一個字節包含了8個位元,即可以表達8布爾狀態內容。 在網路協定設計上為了減少傳輸頻寬往往都會用到位元來定義一些狀態型別,對於讀寫這種型別往往透過以 & | <<和>>這幾個操作完成。

  • bool tag = (0b0000_0001 & data) > 0;//讀取0位是否為1data|=0b0000_0001//設定第0位為1

  • 字節 在協定中 字節往往用來 表自訂的數據型別或狀態 如MQTT中 ConnAck 指令 的響應狀態碼

  • Value

    Hex

    Reason Code name

    Description

    0

    0x00

    Success

    The Connection is accepted.

    128

    0x80

    Unspecified error

    The Server does not wish to reveal the reason for the failure, or none of the other Reason Codes apply.

    129

    0x81

    Malformed Packet

    Data within the CONNECT packet could not be correctly parsed.

    130

    0x82

    Protocol Error

    Data in the CONNECT packet does not conform to this specification.

    131

    0x83

    Implementation specific error

    The CONNECT is valid but is not accepted by this Server.

    132

    0x84

    Unsupported Protocol Version

    The Server does not support the version of the MQTT protocol requested by the Client.

  • 變長整型
    在程式中整型的儲存是4字節,不管值的大小都是固定4字節。為了讓傳輸更節省頻寬設計出可變長儲存方式,根據數值的大小儲存空間為1-5個字節,由於傳輸訊息包都不會很大所以在絕大多數情況下儲存的訊息長度不會占用超過4字節。以下是針對可變長度讀寫的幫助類:

  • public classInt7bit{ [ThreadStatic]privatestatic Byte[] mDataBuffer;publicvoidWrite(System.IO.Stream stream, intvalue) {if (mDataBuffer == null) mDataBuffer = newbyte[32];var count = 0;var num = (UInt64)value;while (num >= 0x80) { mDataBuffer[count++] = (Byte)(num | 0x80); num >>= 7; } mDataBuffer[count++] = (Byte)num; stream.Write(mDataBuffer, 0, count); }privateuint mResult = 0;privatebyte mBits = 0;publicint? Read(System.IO.Stream stream) { Byte b;while (true) {if (stream.Length < 1)returnnull;var bt = stream.ReadByte();if (bt < 0) { mBits = 0; mResult = 0;thrownew BXException("Read 7bit int error:byte value cannot be less than zero!"); } b = (Byte)bt; mResult |= (UInt32)((b & 0x7f) << mBits);if ((b & 0x80) == 0) break; mBits += 7;if (mBits >= 32) { mBits = 0; mResult = 0;thrownew BXException("Read 7bit int error:out of maximum value!"); } } mBits = 0;var result = mResult; mResult = 0;return (Int32)result; }}

  • 整型
    協定中有兩種數值型別,分別是短整型和整型;儲存大小分別是2字節和4字節。協定對於整型儲存方式是高字序,而C#預設是低字序所以在處理上要進行簡單的轉換。

  • publicstaticshortSwapInt16(short v){return (short)(((v & 0xFF) << 8) | ((v >> 8) & 0xFF));}publicstaticushortSwapUInt16(ushort v){return (ushort)((uint)((v & 0xFF) << 8) | ((uint)(v >> 8) & 0xFFu));}publicstaticintSwapInt32(int v){return ((SwapInt16((short)v) & 0xFFFF) << 16) | (SwapInt16((short)(v >> 16)) & 0xFFFF);}publicstaticuintSwapUInt32(uint v){return (uint)((SwapUInt16((ushort)v) & 0xFFFF) << 16) | (SwapUInt16((ushort)(v >> 16)) & 0xFFFFu);}

    透過以上函式就可以進行對應儲存字序的轉換了,在讀寫流的時候加入這個函式呼叫即可。

    publicvirtualvoidWriteUInt16(System.IO.Stream stream, ushortvalue){value = BitHelper.SwapUInt16(value);var data = BitConverter.GetBytes(value); stream.Write(data, 0, data.Length);}publicvirtualushortReadUInt16(System.IO.Stream stream){var buffer = GetIntBuffer(); stream.Read(buffer, 0, 2);var result = BitConverter.ToUInt16(buffer, 0);return BitHelper.SwapUInt16(result);}publicvirtualvoidWriteInt16(System.IO.Stream stream, shortvalue){value = BitHelper.SwapInt16(value);var data = BitConverter.GetBytes(value); stream.Write(data, 0, data.Length);}publicvirtualshortReadInt16(System.IO.Stream stream){var buffer = GetIntBuffer(); stream.Read(buffer, 0, 2);var result = BitConverter.ToInt16(buffer, 0);return BitHelper.SwapInt16(result);}

  • 字元
    Utf8的字元型別,這個型別是由一個2字節的整型頭部描述字元的長度。

  • publicvirtualvoidWriteString(System.IO.Stream stream, stringvalue, Encoding encoding = null){if (encoding == null) encoding = Encoding.UTF8;if (string.IsNullOrEmpty(value)) { WriteUInt16(stream, 0);return; }byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(value.Length * 6);var count = encoding.GetBytes(value, 0, value.Length, buffer, 0); WriteUInt16(stream, (ushort)count); stream.Write(buffer, 0, count); System.Buffers.ArrayPool<byte>.Shared.Return(buffer);}publicvirtualstringReadString(System.IO.Stream stream, Encoding encoding = null){if (encoding == null) encoding = Encoding.UTF8; UInt16 len = ReadUInt16(stream);if (len == 0)returnstring.Empty;byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(len); stream.Read(buffer, 0, len);string result = encoding.GetString(buffer, 0, len); System.Buffers.ArrayPool<byte>.Shared.Return(buffer);return result;}

  • 二進制
    型別為字節陣列,該 型別是由一個2字節的整型頭部描述陣列的長度

  • publicvoidWriteBinary(System.IO.Stream stream, byte[] data){ WriteUInt16(stream, (ushort)data.Length); stream.Write(data, 0, data.Length);}publicbyte[] ReadBinary(System.IO.Stream stream){var len = ReadUInt16(stream);byte[] result = newbyte[len]; stream.Read(result, 0, len);return result;}

    以上是MQTT的協定型別描述和對應的實作操作,接下來就可以實作具體的協定了。

    拆分訊息包
    在編寫程式碼前先了解一下MQTT的訊息包是如何拆分的,以下是協定定義的格式

    Bit

    7

    6

    5

    4

    3

    2

    1

    0

    byte 1

    MQTT Control Packet type

    Flags specific to each MQTT Control Packet type

    byte 2…

    Remaining Length

    協定把第一個字節拆分成兩部份,第一部份的4位元用於描述訊息相關的控制型別,不同訊息包有不同的情況具體檢視協定文件;第二部份是高4位元位元用於描述訊息型別。從第二個字節開始是一個變長整型用於描述訊息體的剩下的數據長度。
    其實從實作效能上來說我不認同這種協定設計,可變長度雖然可以節省2個節字的頻寬(5.0協定是2019年定的,個人感覺現有的網路資源並不缺這點流量).....這種設計導致訊息寫入網路流存在一次記憶體拷貝,然而後面的擴充套件內容也是,這樣一來導致一個訊息的寫入流就存在多次拷貝影響效能。下面來簡單講述這情況:

    由於可變長度是根據訊息內容來確定,導致無法在網路緩沖記憶體中分配一個固定的空間,因此只能在新的記憶體中寫入內容得到長度後再把長度寫入的網路記憶體塊,然後再把對應的內容再復制到網路記憶體塊中。如果長度是固定的如4字節,就可以在寫入訊息頭後馬上分配一個4字節的空間後直接寫入具體的訊息內容,當內容寫入後得到的長再反填充之前分配的長度空間即可;這樣的好處是訊息內容可以直接寫入到網路記憶體塊中,無須用其他記憶體寫入後再拷貝。

    實作訊息型別
    既然有了協定的基礎規則,那就可以針對它來實作一個訊息抽像類來表達數據訊息了。

    publicabstract classMQTTMessage {publicMQTTMessage() { }publicabstract MQTTMessageType Type { get; }publicbyte Bit1 { get; set; }publicbyte Bit2 { get; set; }publicbyte Bit3 { get; set; }publicbyte Bit4 { get; set; }internalvoidRead(MQTTParse parse, System.IO.Stream stream, ISession session) { OnRead(parse, stream, session); }protectedvirtualvoidOnRead(MQTTParse parse, Stream stream, ISession session) { }internalvoidWrite(MQTTParse parse, System.IO.Stream stream, ISession session) { OnWrite(parse, stream, session); }protectedvirtualvoidOnWrite(MQTTParse parse, Stream stream, ISession session) { }}

    在設計過程中預留OnRead和OnWrite方法給具體訊息實作對應的網路數據讀寫;有了這個抽象的訊息結構就可以對協定進行讀寫拆分了。

    publicoverride MQTTMessage Read(Stream stream, ISession session){ IServer server = session?.Server;if (stream.Length > 0) {if (mType == null) { mHeader = (byte)stream.ReadByte();if (mHeader < 0) {thrownew BXException("parse mqtt message error:fixed header byte value cannot be less than zero!"); } mType = (MQTTMessageType)((mHeader & 0b1111_0000) >> 4);if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, "parse mqtt header souccess"); }if (mLength == null) { mLength = mInt7BitHandler.Read(stream);if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"parse mqtt size {mLength}"); }if (mLength != null && stream.Length >= mLength.Value) { Stream protocolStream = GetProtocolStream(); CopyStream(stream, protocolStream, mLength.Value); MQTTMessage msg = CreateMessage(mType.Value, session); msg.Bit1 = (byte)(BIT_1 & mHeader); msg.Bit2 = (byte)(BIT_2 & mHeader); msg.Bit3 = (byte)(BIT_3 & mHeader); msg.Bit4 = (byte)(BIT_4 & mHeader); msg.Read(this, protocolStream, session);if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"parse mqtt type {msg} success"); mLength = null; mType = null;return msg; } }returnnull;}publicoverridevoidWrite(MQTTMessage msg, Stream stream, ISession session){ IServer server = session?.Server;if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"write mqtt message {msg}");var protocolStream = GetProtocolStream();int header = 0; header |= ((int)msg.Type << 4); header |= msg.Bit1; header |= msg.Bit2 << 1; header |= msg.Bit3 << 2; header |= msg.Bit4 << 3; stream.WriteByte((byte)header); msg.Write(this, protocolStream, session);if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"write mqtt message body size {protocolStream.Length}"); mInt7BitHandler.Write(stream, (int)protocolStream.Length); protocolStream.Position = 0; protocolStream.CopyTo(stream);if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"write mqtt message success");}

    透過以上的Read和Write方法就可以把網路數據和程式物件進行一個轉換了。
    到這裏MQTT最上層的訊息包解釋處理就完成了,剩下不同訊息的實作留給後面的章節再詳細講解。

    BeetleX

    開源跨平台通訊框架(支持TLS)

    提供HTTP,Websocket,MQTT,Redis,RPC和服務閘道器開源元件

    個人微信:henryfan128 QQ:28304340

    關註公眾號

    https://github.com/beetlex-io/
    http://beetlex-io.com