當前位置: 妍妍網 > 碼農

【BeetleX重構】實作一個可讀寫的ReadOnlySequence介面卡

2024-05-28碼農

ReadOnlySequence只是一個可讀取非連續記憶體結構,它的使用更多了現在Pipe中。由 Pipe的Writer負責非 連續記憶體數據寫入(一般掛載到Socket的接收端),而對應的Reader則獲取相應可讀的 ReadOnlySequence數據。Pipe是一個高效的異步讀寫模型,它可以更好地分離 Socket讀寫邏輯,從而讓 Socket收發的異步處理更高效復位到相關狀態工作。

由於 Pipe是基於異步的,對於IO分離讀寫的確很適合。但對於記憶體讀寫的處理從效率上來說就不太適合了。在編寫網路服務時一般都可以掛 Pipe用作底層處理,那上層的SslStream就並不適合了。為了上層的統一處理方式那就有必要把 SslStre am的數據轉成 ReadOnlySeq uence。到這裏可能有個疑問直接把 SslStre am的數據讀出來然後返回對應的 ReadOnlySeq uence就可以了啊,為什麽還多花時間寫個介面卡? 但網路數據處理上有些不同,並不是一次讀到的數據就是一個完全協定包,往往是由多次不連續的網路數據組成;這個時候就需要把每次接收到的數據存放在 ReadOnlySeq uence裏處理。而還有一個更重要的問題是 ReadOnlySeq uence讀取完的記憶體怎處理?而實作介面卡的重要作用也就解決這些問題,非連續記憶體的寫入和讀取迴圈復用。

實作 ReadOnlySeq uence介面卡先了解是如何建立它的,以下是它的建構函式

publicReadOnlySequence(ReadOnlyMemory<T> memory);publicReadOnlySequence(T[] array);publicReadOnlySequence(T[] arrayint start, int length);publicReadOnlySequence(ReadOnlySequenceSegment<T> startSegment, int startIndex, ReadOnlySequenceSegment<T> endSegment, int endIndex);

從建構函式上來看顯然需要透過ReadOnlySequenceSegment<T>來建立。


看幫助這個類似乎沒有內建實作,那就說只能自己去實作一個了。

internal classMemorySegment : ReadOnlySequenceSegment<byte> {public MemorySegment(ReadOnlyMemory<byte> memory) { Memory = memory; }public MemorySegment Append(ReadOnlyMemory<byte> memory) {var segment = new MemorySegment(memory) { RunningIndex = RunningIndex + Memory.Length }; Next = segment;return segment; } }

為什麽添加Append方法,主要原因是 ReadOnlySeq uence只支持開始和結束兩個 ReadOnlySequenceSegment<T>物件,分異位成非連續記憶體連結串列的開始和結束。

ReadOnlySequenceSegment<T>只是一個單向連結串列記憶體指向,實際使用還需要一個記憶體結構類

classMemoryBlock : IDisposable{publicMemoryBlock(int length, int segmentMinSize) { Data = MemoryPool<byte>.Shared.Rent(length < segmentMinSize ? segmentMinSize : length); Length = Data.Memory.Length; Memory = Data.Memory; }public IMemoryOwner<byte> Data { get; set; }public Memory<byte> Memory { get; set; }publicint Length { get; set; }publicint Allocated { get; set; }publicint Postion { get; set; }public MemoryBlock Next { get; set; }publicvoidReadAdvanceTo(int count) { Postion += count; }publicvoidAdvanceTo(int count) { Allocated += count; }public Memory<byte> Allot(int size) {return Memory.Slice(Allocated, size); }publicboolTryAllot(int size) {return Memory.Length - Allocated >= size; }public Memory<byte> GetUseMemory() {return Memory.Slice(Postion, Allocated - Postion); }publicvoidDispose() { Data.Dispose(); Memory = null; Data = null; }}

同樣這個記憶體類也要構建成一個連結串列,所以也定義了Next內容指向下一下記憶體儲存類。

有了以上兩個類就可以構建 ReadOnlySeq uence的介面卡了,定義對應的記憶體分配方法

public Span<byte> GetSpan(int length) {return GetMemory(length).Span; }public Memory<byte> GetMemory(int length) {if (_end == null) { CreateMemory(length); }if (!_end.TryAllot(length)) CreateMemory(length);return _end.Allot(length); }privatevoidCreateMemory(int length) { MemoryBlock result = new MemoryBlock(length, SegmentMinSize);if (_first == null) _first = result;if (_end == null) { _end = result; }else { _end.Next = result; _end = result; } }

當記憶體分配後填充數據完成就需要送出對應寫入數據的長度

publicvoidWriteAdvanceTo(int length) { _end.AdvanceTo(length); }

數據寫入完成後就可以送出數據並構建對應可讀的 ReadOnlySeq uence了

public void Flush(){var start = _first.GetUseMemory();if (_first == _end) {_readOnlySequence = new ReadOnlySequence<byte>(start); }else {MemorySegment first = new MemorySegment(start);MemoryBlocklast = _first.Next;MemorySegment next = first;while (last != null) {next = next.Append(last.GetUseMemory());last = last.Next; }_readOnlySequence = new ReadOnlySequence<byte>(first, 0, next, next.Memory.Length); }}

寫入的功能已經完成,接下來就是讀取送出的程式碼

publicvoidReaderAdvanceTo(long length){ _readOnlySequence = _readOnlySequence.Slice(length);while (length > 0) {var memory = _first;var len = memory.Allocated - memory.Postion;if (length >= len) { length -= len; memory.Dispose(); _first = memory.Next;if (_first == null) { _end = null;return; } }else { memory.ReadAdvanceTo((int)length); length = 0; } }}

這一環節最重要的工作就是檢查每個記憶體塊是否已經讀取完成,如果完成就把記憶體回收到池裏。接下來再實作一個相應的Stream方便相容第三方使用

  • public classReadOnlySequenceAdapterStream : Stream, ISpanSequenceNetStream{publicReadOnlySequenceAdapterStream() {this.ReadOnlySequenceAdapter = new ReadOnlySequenceAdapter();this.ReadOnlySequenceAdapter.SegmentMinSize = 1024 * 4; }public ReadOnlySequenceAdapter ReadOnlySequenceAdapter { get; privateset; }publicoverridebool CanRead => true;publicoverridebool CanSeek => thrownew NotImplementedException();publicoverridebool CanWrite => thrownew NotImplementedException();publicoverridelong Length => ReadOnlySequenceAdapter.ReadOnlySequence.Length;publicoverridelong Position { get => thrownew NotImplementedException(); set => thrownew NotImplementedException(); }public Span<byte> Allot(int count) {var result = ReadOnlySequenceAdapter.GetSpan(count); WriteAdvance(4);return result; }publicoverridevoidFlush() {this.ReadOnlySequenceAdapter.Flush(); }public ReadOnlySequence<byte> GetReadOnlySequence() {return ReadOnlySequenceAdapter.ReadOnlySequence; }public Memory<byte> GetWriteMemory(int count) {return ReadOnlySequenceAdapter.GetMemory(count); }publicoverrideintReadByte() {if (Length == 0)return-1;int result = ReadOnlySequenceAdapter.ReadOnlySequence.FirstSpan[0]; ReadAdvance(1);return result; }publicoverrideintRead(byte[] buffer, int offset, int count) {if (buffer.Length == 0)return0;if (Length > count) { ReadOnlySequenceAdapter.ReadOnlySequence.CopyTo(new Span<byte>(buffer, offset, count)); ReadAdvance(count);return count; }else {var len = (int)Length; ReadOnlySequenceAdapter.ReadOnlySequence.CopyTo(new Span<byte>(buffer, offset, len)); ReadAdvance(len);return len; } }publicvoidReadAdvance(long count) {this.ReadOnlySequenceAdapter.ReaderAdvanceTo(count); }publicoverridelongSeek(long offset, SeekOrigin origin) {thrownew NotImplementedException(); }publicoverridevoidSetLength(longvalue) {thrownew NotImplementedException(); }publicoverridevoidWriteByte(bytevalue) {var memory = ReadOnlySequenceAdapter.GetMemory(1); memory.Span[0] = value; WriteAdvance(1); }publicboolTryRead(int count, out ReadOnlySequence<byte> data) { data = default;if (Length > count) { data = ReadOnlySequenceAdapter.ReadOnlySequence.Slice(0, count);returntrue; }returnfalse; }publicoverridevoidWrite(byte[] buffer, int offset, int count) {var memory = ReadOnlySequenceAdapter.GetSpan(count); Span<byte> span = new Span<byte>(buffer, offset, count); span.CopyTo(memory); WriteAdvance(count); }public Span<byte> GetWriteSpan(int count) {return ReadOnlySequenceAdapter.GetSpan(count); }privatelong _catchWriteLength = 0;publicvoidStartWriteLength() { _catchWriteLength = 0; ; }publicintEndWriteLength() {return (int)_catchWriteLength; }publicvoidWriteAdvance(int count) { _catchWriteLength += count; ReadOnlySequenceAdapter.WriteAdvanceTo(count); }protectedoverridevoidDispose(bool disposing) {base.Dispose(disposing);if (disposing) { ReadOnlySequenceAdapter.Dispose(); } }publicvoidImport(Stream stream) {var span = GetWriteSpan(1024 * 4);var len = stream.Read(span);while (len > 0) { WriteAdvance(len); len = stream.Read(span); } Flush(); }}

    這樣一個標準化並支持 ReadOnlySeq uence的Stream就實作了,如果想這個類需要支持Socket和SslStream的中間層還需擴充套件它的異步方法,並支持Socket異步讀寫, SslStream握手驗證的時候需要使用到Stream的異步方法。

    BeetleX

    開源跨平台通訊框架(支持TLS)

    提供HTTP,Websocket,MQTT,Redis,RPC和服務閘道器開源元件

    個人微信:henryfan128 QQ:28304340

    關註公眾號

    https://github.com/beetlex-io/