当前位置: 欣欣网 > 码农

MQTT协议代码实现详解三

2024-05-14码农

上两篇文件讲述了MQTT协议基础分解过程,这一章节主要讲述具体消息的实现。由于消息处理规范都差不多,接下来就介绍Connect和Publish两个比较复杂的消息分解代码,这两个消息在MQTT协议中算是比较复杂的两个,只能理解这两个的协议分析那对实现整个MQTT协议就不存在问题了。
参考文档 中文:vitsumoc.github.io/ mqtt-v5-0-chinese.ht ml
英文: d ocs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
编程语言: C#
完整代码: //github.com/beetlex-io/mqtt

Connect消息
在实现之前还是需要先了解一下Connect的数据结构,这个消息也是MQTT中结构最复杂的一个了。消息包括一个可变头部的数据集和Payload的属性集。


Description

7

6

5

4

3

2

1

0

Protocol Name

byte 1

Length MSB (0)

0

0

0

0

0

0

0

0

byte 2

Length LSB (4)

0

0

0

0

0

1

0

0

byte 3

‘M’

0

1

0

0

1

1

0

1

byte 4

‘Q’

0

1

0

1

0

0

0

1

byte 5

‘T’

0

1

0

1

0

1

0

0

byte 6

‘T’

0

1

0

1

0

1

0

0

Protocol Version


Description

7

6

5

4

3

2

1

0

byte 7

Version (5)

0

0

0

0

0

1

0

1

Connect Flags

byte 8

User Name Flag (1)

Password Flag (1)

Will Retain (0)

Will QoS (01)

Will Flag (1)

Clean Start(1)

Reserved (0)

1

1

0

0

1

1

1

0

Keep Alive

byte 9

Keep Alive MSB (0)

0

0

0

0

0

0

0

0

byte 10

Keep Alive LSB (10)

0

0

0

0

1

0

1

0

Properties

byte 11

Length (5)

0

0

0

0

0

1

0

1

byte 12

Session Expiry Interval identifier (17)

0

0

0

1

0

0

0

1

byte 13

Session Expiry Interval (10)

0

0

0

0

0

0

0

0

byte 14

0

0

0

0

0

0

0

0

byte 15

0

0

0

0

0

0

0

0

byte 16

0

0

0

0

1

0

1

0

而对应Payload的可以变属性集就包括客户端ID和遗嘱属性集, 遗嘱属 性集又包含着属性集,这些详细都有在协议文档描述。有了这些规则就可以通过代码来分解协议了,在读写过程中相关内容顺序必须按照文档描述的顺序来读写(针对前一章描述的属性表内容则无须根据文档描述顺序,那个是似对应的属性编码来对应)。

  • 读取协议代码

  • ProtocolName = parse.ReadString(stream);Version = (byte)stream.ReadByte();byte flats = (byte)stream.ReadByte();Reserved = (MQTTParse.BIT_1 & flats) > 0;ClearStart = (MQTTParse.BIT_2 & flats) > 0;WillFalg = (MQTTParse.BIT_3 & flats) > 0;WillRetian = (MQTTParse.BIT_6 & flats) > 0;PasswordFlag = (MQTTParse.BIT_7 & flats) > 0;UserFlag = (MQTTParse.BIT_8 & flats) > 0;this.WillQos = (QoSType)((0b0001_1000 & flats) >> 3);KeepAlive = parse.ReadUInt16(stream);var ps = GetPropertiesStream();ps.Read(parse, stream);SessionExpiryInterval = ps;ReceiveMaximum = ps;MaximumPacketSize = ps;TopicAliasMaximum = ps;RequestResponseInformation = ps;RequestProblemInformation = ps;AuthenticationMethod = ps;AuthenticationData = ps;UserProperties = ps;ClientID = parse.ReadString(stream);if (WillFalg){ this.WillProperty.Read(this, parse, stream); WillTopic = parse.ReadString(stream); WillPayload = parse.ReadBinary(stream);}if (UserFlag) Name = parse.ReadString(stream);if (PasswordFlag) Password = parse.ReadString(stream);

  • 写入协议代码

  • parse.WriteString(stream, ProtocolName);stream.WriteByte(Version);byte flats = 0x0;if (Reserved) flats |= MQTTParse.BIT_1;if (ClearStart) flats |= MQTTParse.BIT_2;if (WillFalg) flats |= MQTTParse.BIT_3;flats |= (byte)((byte)this.WillQos << 3);if (this.WillRetian) flats |= MQTTParse.BIT_6;if (this.UserFlag) flats |= MQTTParse.BIT_7;if (this.PasswordFlag) flats |= MQTTParse.BIT_8;stream.WriteByte(flats);parse.WriteUInt16(stream, KeepAlive);var ps = GetPropertiesStream() + SessionExpiryInterval + ReceiveMaximum + MaximumPacketSize + TopicAliasMaximum + RequestResponseInformation + RequestProblemInformation + AuthenticationMethod + AuthenticationData + UserProperties;ps.Write(parse, stream);parse.WriteString(stream, ClientID);if (WillFalg){this.WillProperty.Write(this, parse, stream); parse.WriteString(stream, WillTopic);if (WillPayload == null) WillPayload = new byte[0]; parse.WriteBinary(stream, WillPayload);}if (UserFlag) parse.WriteString(stream, Name);if (PasswordFlag) parse.WriteString(stream, Password);

    完整代码
    //github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs

    Publish消息 该消息的结构并不复,带一个可变头部的数据块和推送内容的二进制结构;这个二进制则根据双方约定来进行编码处理,具体可以是JSON,UTF字符等。

    Description

    7

    6

    5

    4

    3

    2

    1

    0


    Topic Name

    byte 1

    Length MSB (0)

    0

    0

    0

    0

    0

    0

    0

    0

    byte 2

    Length LSB (3)

    0

    0

    0

    0

    0

    0

    1

    1

    byte 3

    ‘a’ (0x61)

    0

    1

    1

    0

    0

    0

    0

    1

    byte 4

    ‘/’ (0x2F)

    0

    0

    1

    0

    1

    1

    1

    1

    byte 5

    ‘b’ (0x62)

    0

    1

    1

    0

    0

    0

    1

    0

    Packet Identifier

    byte 6

    Packet Identifier MSB (0)

    0

    0

    0

    0

    0

    0

    0

    0

    byte 7

    Packet Identifier LSB (10)

    0

    0

    0

    0

    1

    0

    1

    0

    Property Length

    byte 8

    No Properties

    0

    0

    0

    0

    0

    0

    0

    0

    以上是Publish消息的可变头结构,包括主题名称,消息ID和属性集;剩下的消息问题即是提交的消息体。

  • 读取协议代码

  • Topic = parse.ReadString(stream);if (QoS != QoSType.MostOnce) Identifier = parse.ReadUInt16(stream);var ps = GetPropertiesStream();ps.Read(parse, stream);PayloadFormatIndicator = ps;MessageExpiryInterval = ps;TopicAlias = ps;ResponseTopic = ps;CorrelationData = ps;SubscriptionIdentifier = ps;ContentType = ps;UserProperties = ps;var len = (int)(stream.Length - stream.Position);var payload = MQTTMessage.RentPayloadBuffer(len);stream.Read(payload, 0, len);Payload = new ArraySegment<byte>(payload, 0, len);

  • 写入协议代码

  • base.OnWrite(parse, stream, session);parse.WriteString(stream, Topic);if (QoS != QoSType.MostOnce) parse.WriteUInt16(stream, Identifier);var ps = GetPropertiesStream() + PayloadFormatIndicator + MessageExpiryInterval + TopicAlias + ResponseTopic + CorrelationData + SubscriptionIdentifier + UserProperties;ps.Write(parse, stream);stream.Write(Payload.Array, Payload.Offset, Payload.Count);

    完整代码
    //github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs

    通以上两个消息协议代码的实现已经可以体现出MQTT协议的分析处理,其实MQTT协议的处理并不复杂,为了方便程序中的操作只能在针对协议定义对应的结构体繁琐工作。

    BeetleX

    开源跨平台通讯框架(支持TLS)

    提供HTTP,Websocket,MQTT,Redis,RPC和服务网关开源组件

    个人微信:henryfan128 QQ:28304340

    关注公众号

    https://github.com/beetlex-io/
    http://beetlex-io.com