當前位置: 妍妍網 > 碼農

MQTT協定程式碼實作詳解三

2024-05-14碼農

上兩篇檔講述了MQTT協定基礎分解過程,這一章節主要講述具體訊息的實作。由於訊息處理規範都差不多,接下來就介紹Connect和Publish兩個比較復雜的訊息分解程式碼,這兩個訊息在MQTT協定中算是比較復雜的兩個,只能理解這兩個的協定分析那對實作整個MQTT協定就不存在問題了。
參考文件 中文:vitsumoc.github.io/ mqtt-v5-0-chinese.ht ml
英文: d ocs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
程式語言: C#
完整程式碼: //github.com/beetlex-io/mqtt

Connect訊息
在實作之前還是需要先了解一下Connect的數據結構,這個訊息也是MQTT中結構最復雜的一個了。訊息包括一個可變頭部的數據集和Payload的內容集。


Description

7

6

5

4

3

2

1

0

Protocol Name

byte 1

Length MSB (0)

0

0

0

0

0

0

0

0

byte 2

Length LSB (4)

0

0

0

0

0

1

0

0

byte 3

‘M’

0

1

0

0

1

1

0

1

byte 4

‘Q’

0

1

0

1

0

0

0

1

byte 5

‘T’

0

1

0

1

0

1

0

0

byte 6

‘T’

0

1

0

1

0

1

0

0

Protocol Version


Description

7

6

5

4

3

2

1

0

byte 7

Version (5)

0

0

0

0

0

1

0

1

Connect Flags

byte 8

User Name Flag (1)

Password Flag (1)

Will Retain (0)

Will QoS (01)

Will Flag (1)

Clean Start(1)

Reserved (0)

1

1

0

0

1

1

1

0

Keep Alive

byte 9

Keep Alive MSB (0)

0

0

0

0

0

0

0

0

byte 10

Keep Alive LSB (10)

0

0

0

0

1

0

1

0

Properties

byte 11

Length (5)

0

0

0

0

0

1

0

1

byte 12

Session Expiry Interval identifier (17)

0

0

0

1

0

0

0

1

byte 13

Session Expiry Interval (10)

0

0

0

0

0

0

0

0

byte 14

0

0

0

0

0

0

0

0

byte 15

0

0

0

0

0

0

0

0

byte 16

0

0

0

0

1

0

1

0

而對應Payload的可以變內容集就包括客戶端ID和遺囑內容集, 遺囑屬 性集又包含著內容集,這些詳細都有在協定文件描述。有了這些規則就可以透過程式碼來分解協定了,在讀寫過程中相關內容順序必須按照文件描述的順序來讀寫(針對前一章描述的內容表內容則無須根據文件描述順序,那個是似對應的內容編碼來對應)。

  • 讀取協定程式碼

  • ProtocolName = parse.ReadString(stream);Version = (byte)stream.ReadByte();byte flats = (byte)stream.ReadByte();Reserved = (MQTTParse.BIT_1 & flats) > 0;ClearStart = (MQTTParse.BIT_2 & flats) > 0;WillFalg = (MQTTParse.BIT_3 & flats) > 0;WillRetian = (MQTTParse.BIT_6 & flats) > 0;PasswordFlag = (MQTTParse.BIT_7 & flats) > 0;UserFlag = (MQTTParse.BIT_8 & flats) > 0;this.WillQos = (QoSType)((0b0001_1000 & flats) >> 3);KeepAlive = parse.ReadUInt16(stream);var ps = GetPropertiesStream();ps.Read(parse, stream);SessionExpiryInterval = ps;ReceiveMaximum = ps;MaximumPacketSize = ps;TopicAliasMaximum = ps;RequestResponseInformation = ps;RequestProblemInformation = ps;AuthenticationMethod = ps;AuthenticationData = ps;UserProperties = ps;ClientID = parse.ReadString(stream);if (WillFalg){ this.WillProperty.Read(this, parse, stream); WillTopic = parse.ReadString(stream); WillPayload = parse.ReadBinary(stream);}if (UserFlag) Name = parse.ReadString(stream);if (PasswordFlag) Password = parse.ReadString(stream);

  • 寫入協定程式碼

  • parse.WriteString(stream, ProtocolName);stream.WriteByte(Version);byte flats = 0x0;if (Reserved) flats |= MQTTParse.BIT_1;if (ClearStart) flats |= MQTTParse.BIT_2;if (WillFalg) flats |= MQTTParse.BIT_3;flats |= (byte)((byte)this.WillQos << 3);if (this.WillRetian) flats |= MQTTParse.BIT_6;if (this.UserFlag) flats |= MQTTParse.BIT_7;if (this.PasswordFlag) flats |= MQTTParse.BIT_8;stream.WriteByte(flats);parse.WriteUInt16(stream, KeepAlive);var ps = GetPropertiesStream() + SessionExpiryInterval + ReceiveMaximum + MaximumPacketSize + TopicAliasMaximum + RequestResponseInformation + RequestProblemInformation + AuthenticationMethod + AuthenticationData + UserProperties;ps.Write(parse, stream);parse.WriteString(stream, ClientID);if (WillFalg){this.WillProperty.Write(this, parse, stream); parse.WriteString(stream, WillTopic);if (WillPayload == null) WillPayload = new byte[0]; parse.WriteBinary(stream, WillPayload);}if (UserFlag) parse.WriteString(stream, Name);if (PasswordFlag) parse.WriteString(stream, Password);

    完整程式碼
    //github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs

    Publish訊息 該訊息的結構並不復,帶一個可變頭部的數據塊和推播內容的二進制結構;這個二進制則根據雙方約定來進行編碼處理,具體可以是JSON,UTF字元等。

    Description

    7

    6

    5

    4

    3

    2

    1

    0


    Topic Name

    byte 1

    Length MSB (0)

    0

    0

    0

    0

    0

    0

    0

    0

    byte 2

    Length LSB (3)

    0

    0

    0

    0

    0

    0

    1

    1

    byte 3

    ‘a’ (0x61)

    0

    1

    1

    0

    0

    0

    0

    1

    byte 4

    ‘/’ (0x2F)

    0

    0

    1

    0

    1

    1

    1

    1

    byte 5

    ‘b’ (0x62)

    0

    1

    1

    0

    0

    0

    1

    0

    Packet Identifier

    byte 6

    Packet Identifier MSB (0)

    0

    0

    0

    0

    0

    0

    0

    0

    byte 7

    Packet Identifier LSB (10)

    0

    0

    0

    0

    1

    0

    1

    0

    Property Length

    byte 8

    No Properties

    0

    0

    0

    0

    0

    0

    0

    0

    以上是Publish訊息的可變頭結構,包括主題名稱,訊息ID和內容集;剩下的訊息問題即是送出的訊息體。

  • 讀取協定程式碼

  • Topic = parse.ReadString(stream);if (QoS != QoSType.MostOnce) Identifier = parse.ReadUInt16(stream);var ps = GetPropertiesStream();ps.Read(parse, stream);PayloadFormatIndicator = ps;MessageExpiryInterval = ps;TopicAlias = ps;ResponseTopic = ps;CorrelationData = ps;SubscriptionIdentifier = ps;ContentType = ps;UserProperties = ps;var len = (int)(stream.Length - stream.Position);var payload = MQTTMessage.RentPayloadBuffer(len);stream.Read(payload, 0, len);Payload = new ArraySegment<byte>(payload, 0, len);

  • 寫入協定程式碼

  • base.OnWrite(parse, stream, session);parse.WriteString(stream, Topic);if (QoS != QoSType.MostOnce) parse.WriteUInt16(stream, Identifier);var ps = GetPropertiesStream() + PayloadFormatIndicator + MessageExpiryInterval + TopicAlias + ResponseTopic + CorrelationData + SubscriptionIdentifier + UserProperties;ps.Write(parse, stream);stream.Write(Payload.Array, Payload.Offset, Payload.Count);

    完整程式碼
    //github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs

    通以上兩個訊息協定程式碼的實作已經可以體現出MQTT協定的分析處理,其實MQTT協定的處理並不復雜,為了方便程式中的操作只能在針對協定定義對應的結構體繁瑣工作。

    BeetleX

    開源跨平台通訊框架(支持TLS)

    提供HTTP,Websocket,MQTT,Redis,RPC和服務閘道器開源元件

    個人微信:henryfan128 QQ:28304340

    關註公眾號

    https://github.com/beetlex-io/
    http://beetlex-io.com