上兩篇檔講述了MQTT協定基礎分解過程,這一章節主要講述具體訊息的實作。由於訊息處理規範都差不多,接下來就介紹Connect和Publish兩個比較復雜的訊息分解程式碼,這兩個訊息在MQTT協定中算是比較復雜的兩個,只能理解這兩個的協定分析那對實作整個MQTT協定就不存在問題了。
參考文件
中文:vitsumoc.github.io/
mqtt-v5-0-chinese.ht
ml
英文: d
ocs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
程式語言: C#
完整程式碼: //github.com/beetlex-io/mqtt
Connect訊息
在實作之前還是需要先了解一下Connect的數據結構,這個訊息也是MQTT中結構最復雜的一個了。訊息包括一個可變頭部的數據集和Payload的內容集。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
Protocol Name | |||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Length LSB (4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
byte 3 | ‘M’ | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 |
byte 4 | ‘Q’ | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 |
byte 5 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
byte 6 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
Protocol Version | |||||||||
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
byte 7 | Version (5) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 1 |
Connect Flags | |||||||||
byte 8 | User Name Flag (1) Password Flag (1) Will Retain (0) Will QoS (01) Will Flag (1) Clean Start(1) Reserved (0) | 1 | 1 | 0 | 0 | 1 | 1 | 1 | 0 |
Keep Alive | |||||||||
byte 9 | Keep Alive MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 10 | Keep Alive LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
Properties | |||||||||
byte 11 | Length (5) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 1 |
byte 12 | Session Expiry Interval identifier (17) | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 1 |
byte 13 | Session Expiry Interval (10) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 14 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 15 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 16 | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
而對應Payload的可以變內容集就包括客戶端ID和遺囑內容集, 遺囑屬 性集又包含著內容集,這些詳細都有在協定文件描述。有了這些規則就可以透過程式碼來分解協定了,在讀寫過程中相關內容順序必須按照文件描述的順序來讀寫(針對前一章描述的內容表內容則無須根據文件描述順序,那個是似對應的內容編碼來對應)。
讀取協定程式碼
ProtocolName = parse.ReadString(stream);
Version = (byte)stream.ReadByte();
byte flats = (byte)stream.ReadByte();
Reserved = (MQTTParse.BIT_1 & flats) > 0;
ClearStart = (MQTTParse.BIT_2 & flats) > 0;
WillFalg = (MQTTParse.BIT_3 & flats) > 0;
WillRetian = (MQTTParse.BIT_6 & flats) > 0;
PasswordFlag = (MQTTParse.BIT_7 & flats) > 0;
UserFlag = (MQTTParse.BIT_8 & flats) > 0;
this.WillQos = (QoSType)((0b0001_1000 & flats) >> 3);
KeepAlive = parse.ReadUInt16(stream);
var ps = GetPropertiesStream();
ps.Read(parse, stream);
SessionExpiryInterval = ps;
ReceiveMaximum = ps;
MaximumPacketSize = ps;
TopicAliasMaximum = ps;
RequestResponseInformation = ps;
RequestProblemInformation = ps;
AuthenticationMethod = ps;
AuthenticationData = ps;
UserProperties = ps;
ClientID = parse.ReadString(stream);
if (WillFalg)
{
this.WillProperty.Read(this, parse, stream);
WillTopic = parse.ReadString(stream);
WillPayload = parse.ReadBinary(stream);
}
if (UserFlag)
Name = parse.ReadString(stream);
if (PasswordFlag)
Password = parse.ReadString(stream);
寫入協定程式碼
parse.WriteString(stream, ProtocolName);
stream.WriteByte(Version);
byte flats = 0x0;
if (Reserved)
flats |= MQTTParse.BIT_1;
if (ClearStart)
flats |= MQTTParse.BIT_2;
if (WillFalg)
flats |= MQTTParse.BIT_3;
flats |= (byte)((byte)this.WillQos << 3);
if (this.WillRetian)
flats |= MQTTParse.BIT_6;
if (this.UserFlag)
flats |= MQTTParse.BIT_7;
if (this.PasswordFlag)
flats |= MQTTParse.BIT_8;
stream.WriteByte(flats);
parse.WriteUInt16(stream, KeepAlive);
var ps = GetPropertiesStream()
+ SessionExpiryInterval
+ ReceiveMaximum
+ MaximumPacketSize
+ TopicAliasMaximum
+ RequestResponseInformation
+ RequestProblemInformation
+ AuthenticationMethod
+ AuthenticationData
+ UserProperties;
ps.Write(parse, stream);
parse.WriteString(stream, ClientID);
if (WillFalg)
{
this.WillProperty.Write(this, parse, stream);
parse.WriteString(stream, WillTopic);
if (WillPayload == null)
WillPayload = new byte[0];
parse.WriteBinary(stream, WillPayload);
}
if (UserFlag)
parse.WriteString(stream, Name);
if (PasswordFlag)
parse.WriteString(stream, Password);
完整程式碼
//github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs
Publish訊息
該訊息的結構並不復,帶一個可變頭部的數據塊和推播內容的二進制結構;這個二進制則根據雙方約定來進行編碼處理,具體可以是JSON,UTF字元等。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
Topic Name | |||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 |
byte 3 | ‘a’ (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
byte 4 | ‘/’ (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 |
byte 5 | ‘b’ (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 |
Packet Identifier | |||||||||
byte 6 | Packet Identifier MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 7 | Packet Identifier LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
Property Length | |||||||||
byte 8 | No Properties | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
以上是Publish訊息的可變頭結構,包括主題名稱,訊息ID和內容集;剩下的訊息問題即是送出的訊息體。
讀取協定程式碼
Topic = parse.ReadString(stream);
if (QoS != QoSType.MostOnce)
Identifier = parse.ReadUInt16(stream);
var ps = GetPropertiesStream();
ps.Read(parse, stream);
PayloadFormatIndicator = ps;
MessageExpiryInterval = ps;
TopicAlias = ps;
ResponseTopic = ps;
CorrelationData = ps;
SubscriptionIdentifier = ps;
ContentType = ps;
UserProperties = ps;
var len = (int)(stream.Length - stream.Position);
var payload = MQTTMessage.RentPayloadBuffer(len);
stream.Read(payload, 0, len);
Payload = new ArraySegment<byte>(payload, 0, len);
寫入協定程式碼
base.OnWrite(parse, stream, session);
parse.WriteString(stream, Topic);
if (QoS != QoSType.MostOnce)
parse.WriteUInt16(stream, Identifier);
var ps = GetPropertiesStream()
+ PayloadFormatIndicator
+ MessageExpiryInterval
+ TopicAlias
+ ResponseTopic
+ CorrelationData
+ SubscriptionIdentifier
+ UserProperties;
ps.Write(parse, stream);
stream.Write(Payload.Array, Payload.Offset, Payload.Count);
完整程式碼
//github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs
通以上兩個訊息協定程式碼的實作已經可以體現出MQTT協定的分析處理,其實MQTT協定的處理並不復雜,為了方便程式中的操作只能在針對協定定義對應的結構體繁瑣工作。
BeetleX
開源跨平台通訊框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服務閘道器開源元件
個人微信:henryfan128 QQ:28304340
關註公眾號
https://github.com/beetlex-io/
http://beetlex-io.com