ReadOnlySequence只是一個可讀取非連續記憶體結構,它的使用更多了現在Pipe中。由
Pipe的Writer負責非
連續記憶體數據寫入(一般掛載到Socket的接收端),而對應的Reader則獲取相應可讀的
ReadOnlySequence數據。Pipe是一個高效的異步讀寫模型,它可以更好地分離
Socket讀寫邏輯,從而讓
Socket收發的異步處理更高效復位到相關狀態工作。
由於 Pipe是基於異步的,對於IO分離讀寫的確很適合。但對於記憶體讀寫的處理從效率上來說就不太適合了。在編寫網路服務時一般都可以掛 Pipe用作底層處理,那上層的SslStream就並不適合了。為了上層的統一處理方式那就有必要把 SslStre am的數據轉成 ReadOnlySeq uence。到這裏可能有個疑問直接把 SslStre am的數據讀出來然後返回對應的 ReadOnlySeq uence就可以了啊,為什麽還多花時間寫個介面卡? 但網路數據處理上有些不同,並不是一次讀到的數據就是一個完全協定包,往往是由多次不連續的網路數據組成;這個時候就需要把每次接收到的數據存放在 ReadOnlySeq uence裏處理。而還有一個更重要的問題是 ReadOnlySeq uence讀取完的記憶體怎處理?而實作介面卡的重要作用也就解決這些問題,非連續記憶體的寫入和讀取迴圈復用。
實作 ReadOnlySeq uence介面卡先了解是如何建立它的,以下是它的建構函式
publicReadOnlySequence(ReadOnlyMemory<T> memory);
publicReadOnlySequence(T[] array);
publicReadOnlySequence(T[] array, int start, int length);
publicReadOnlySequence(ReadOnlySequenceSegment<T> startSegment, int startIndex, ReadOnlySequenceSegment<T> endSegment, int endIndex);
從建構函式上來看顯然需要透過ReadOnlySequenceSegment<T>來建立。
看幫助這個類似乎沒有內建實作,那就說只能自己去實作一個了。
internal classMemorySegment : ReadOnlySequenceSegment<byte>
{
public MemorySegment(ReadOnlyMemory<byte> memory)
{
Memory = memory;
}
public MemorySegment Append(ReadOnlyMemory<byte> memory)
{
var segment = new MemorySegment(memory)
{
RunningIndex = RunningIndex + Memory.Length
};
Next = segment;
return segment;
}
}
為什麽添加Append方法,主要原因是
ReadOnlySeq
uence只支持開始和結束兩個
ReadOnlySequenceSegment<T>物件,分異位成非連續記憶體連結串列的開始和結束。
ReadOnlySequenceSegment<T>只是一個單向連結串列記憶體指向,實際使用還需要一個記憶體結構類
classMemoryBlock : IDisposable
{
publicMemoryBlock(int length, int segmentMinSize)
{
Data = MemoryPool<byte>.Shared.Rent(length < segmentMinSize ? segmentMinSize : length);
Length = Data.Memory.Length;
Memory = Data.Memory;
}
public IMemoryOwner<byte> Data { get; set; }
public Memory<byte> Memory { get; set; }
publicint Length { get; set; }
publicint Allocated { get; set; }
publicint Postion { get; set; }
public MemoryBlock Next { get; set; }
publicvoidReadAdvanceTo(int count)
{
Postion += count;
}
publicvoidAdvanceTo(int count)
{
Allocated += count;
}
public Memory<byte> Allot(int size)
{
return Memory.Slice(Allocated, size);
}
publicboolTryAllot(int size)
{
return Memory.Length - Allocated >= size;
}
public Memory<byte> GetUseMemory()
{
return Memory.Slice(Postion, Allocated - Postion);
}
publicvoidDispose()
{
Data.Dispose();
Memory = null;
Data = null;
}
}
同樣這個記憶體類也要構建成一個連結串列,所以也定義了Next內容指向下一下記憶體儲存類。
有了以上兩個類就可以構建
ReadOnlySeq
uence的介面卡了,定義對應的記憶體分配方法
public Span<byte> GetSpan(int length)
{
return GetMemory(length).Span;
}
public Memory<byte> GetMemory(int length)
{
if (_end == null)
{
CreateMemory(length);
}
if (!_end.TryAllot(length))
CreateMemory(length);
return _end.Allot(length);
}
privatevoidCreateMemory(int length)
{
MemoryBlock result = new MemoryBlock(length, SegmentMinSize);
if (_first == null)
_first = result;
if (_end == null)
{
_end = result;
}
else
{
_end.Next = result;
_end = result;
}
}
當記憶體分配後填充數據完成就需要送出對應寫入數據的長度
publicvoidWriteAdvanceTo(int length)
{
_end.AdvanceTo(length);
}
數據寫入完成後就可以送出數據並構建對應可讀的 ReadOnlySeq uence了
public void Flush()
{
var start = _first.GetUseMemory();
if (_first == _end)
{
_readOnlySequence = new ReadOnlySequence<byte>(start);
}
else
{
MemorySegment first = new MemorySegment(start);
MemoryBlocklast = _first.Next;
MemorySegment next = first;
while (last != null)
{
next = next.Append(last.GetUseMemory());
last = last.Next;
}
_readOnlySequence = new ReadOnlySequence<byte>(first, 0, next, next.Memory.Length);
}
}
寫入的功能已經完成,接下來就是讀取送出的程式碼
publicvoidReaderAdvanceTo(long length)
{
_readOnlySequence = _readOnlySequence.Slice(length);
while (length > 0)
{
var memory = _first;
var len = memory.Allocated - memory.Postion;
if (length >= len)
{
length -= len;
memory.Dispose();
_first = memory.Next;
if (_first == null)
{
_end = null;
return;
}
}
else
{
memory.ReadAdvanceTo((int)length);
length = 0;
}
}
}
這一環節最重要的工作就是檢查每個記憶體塊是否已經讀取完成,如果完成就把記憶體回收到池裏。接下來再實作一個相應的Stream方便相容第三方使用
public classReadOnlySequenceAdapterStream : Stream, ISpanSequenceNetStream
{
publicReadOnlySequenceAdapterStream()
{
this.ReadOnlySequenceAdapter = new ReadOnlySequenceAdapter();
this.ReadOnlySequenceAdapter.SegmentMinSize = 1024 * 4;
}
public ReadOnlySequenceAdapter ReadOnlySequenceAdapter { get; privateset; }
publicoverridebool CanRead => true;
publicoverridebool CanSeek => thrownew NotImplementedException();
publicoverridebool CanWrite => thrownew NotImplementedException();
publicoverridelong Length => ReadOnlySequenceAdapter.ReadOnlySequence.Length;
publicoverridelong Position { get => thrownew NotImplementedException(); set => thrownew NotImplementedException(); }
public Span<byte> Allot(int count)
{
var result = ReadOnlySequenceAdapter.GetSpan(count);
WriteAdvance(4);
return result;
}
publicoverridevoidFlush()
{
this.ReadOnlySequenceAdapter.Flush();
}
public ReadOnlySequence<byte> GetReadOnlySequence()
{
return ReadOnlySequenceAdapter.ReadOnlySequence;
}
public Memory<byte> GetWriteMemory(int count)
{
return ReadOnlySequenceAdapter.GetMemory(count);
}
publicoverrideintReadByte()
{
if (Length == 0)
return-1;
int result = ReadOnlySequenceAdapter.ReadOnlySequence.FirstSpan[0];
ReadAdvance(1);
return result;
}
publicoverrideintRead(byte[] buffer, int offset, int count)
{
if (buffer.Length == 0)
return0;
if (Length > count)
{
ReadOnlySequenceAdapter.ReadOnlySequence.CopyTo(new Span<byte>(buffer, offset, count));
ReadAdvance(count);
return count;
}
else
{
var len = (int)Length;
ReadOnlySequenceAdapter.ReadOnlySequence.CopyTo(new Span<byte>(buffer, offset, len));
ReadAdvance(len);
return len;
}
}
publicvoidReadAdvance(long count)
{
this.ReadOnlySequenceAdapter.ReaderAdvanceTo(count);
}
publicoverridelongSeek(long offset, SeekOrigin origin)
{
thrownew NotImplementedException();
}
publicoverridevoidSetLength(longvalue)
{
thrownew NotImplementedException();
}
publicoverridevoidWriteByte(bytevalue)
{
var memory = ReadOnlySequenceAdapter.GetMemory(1);
memory.Span[0] = value;
WriteAdvance(1);
}
publicboolTryRead(int count, out ReadOnlySequence<byte> data)
{
data = default;
if (Length > count)
{
data = ReadOnlySequenceAdapter.ReadOnlySequence.Slice(0, count);
returntrue;
}
returnfalse;
}
publicoverridevoidWrite(byte[] buffer, int offset, int count)
{
var memory = ReadOnlySequenceAdapter.GetSpan(count);
Span<byte> span = new Span<byte>(buffer, offset, count);
span.CopyTo(memory);
WriteAdvance(count);
}
public Span<byte> GetWriteSpan(int count)
{
return ReadOnlySequenceAdapter.GetSpan(count);
}
privatelong _catchWriteLength = 0;
publicvoidStartWriteLength()
{
_catchWriteLength = 0; ;
}
publicintEndWriteLength()
{
return (int)_catchWriteLength;
}
publicvoidWriteAdvance(int count)
{
_catchWriteLength += count;
ReadOnlySequenceAdapter.WriteAdvanceTo(count);
}
protectedoverridevoidDispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
ReadOnlySequenceAdapter.Dispose();
}
}
publicvoidImport(Stream stream)
{
var span = GetWriteSpan(1024 * 4);
var len = stream.Read(span);
while (len > 0)
{
WriteAdvance(len);
len = stream.Read(span);
}
Flush();
}
}
這樣一個標準化並支持
ReadOnlySeq
uence的Stream就實作了,如果想這個類需要支持Socket和SslStream的中間層還需擴充套件它的異步方法,並支持Socket異步讀寫,
SslStream握手驗證的時候需要使用到Stream的異步方法。
BeetleX
開源跨平台通訊框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服務閘道器開源元件
個人微信:henryfan128 QQ:28304340
關註公眾號
https://github.com/beetlex-io/