当前位置: 欣欣网 > 码农

【BeetleX重构】实现一个可读写的ReadOnlySequence适配器

2024-05-28码农

ReadOnlySequence只是一个可读取非连续内存结构,它的使用更多了现在Pipe中。由 Pipe的Writer负责非 连续内存数据写入(一般挂载到Socket的接收端),而对应的Reader则获取相应可读的 ReadOnlySequence数据。Pipe是一个高效的异步读写模型,它可以更好地分离 Socket读写逻辑,从而让 Socket收发的异步处理更高效复位到相关状态工作。

由于 Pipe是基于异步的,对于IO分离读写的确很适合。但对于内存读写的处理从效率上来说就不太适合了。在编写网络服务时一般都可以挂 Pipe用作底层处理,那上层的SslStream就并不适合了。为了上层的统一处理方式那就有必要把 SslStre am的数据转成 ReadOnlySeq uence。到这里可能有个疑问直接把 SslStre am的数据读出来然后返回对应的 ReadOnlySeq uence就可以了啊,为什么还多花时间写个适配器? 但网络数据处理上有些不同,并不是一次读到的数据就是一个完全协议包,往往是由多次不连续的网络数据组成;这个时候就需要把每次接收到的数据存放在 ReadOnlySeq uence里处理。而还有一个更重要的问题是 ReadOnlySeq uence读取完的内存怎处理?而实现适配器的重要作用也就解决这些问题,非连续内存的写入和读取循环复用。

实现 ReadOnlySeq uence适配器先了解是如何创建它的,以下是它的构造函数

publicReadOnlySequence(ReadOnlyMemory<T> memory);publicReadOnlySequence(T[] array);publicReadOnlySequence(T[] arrayint start, int length);publicReadOnlySequence(ReadOnlySequenceSegment<T> startSegment, int startIndex, ReadOnlySequenceSegment<T> endSegment, int endIndex);

从构造函数上来看显然需要通过ReadOnlySequenceSegment<T>来创建。


看帮助这个类似乎没有内置实现,那就说只能自己去实现一个了。

internal classMemorySegment : ReadOnlySequenceSegment<byte> {public MemorySegment(ReadOnlyMemory<byte> memory) { Memory = memory; }public MemorySegment Append(ReadOnlyMemory<byte> memory) {var segment = new MemorySegment(memory) { RunningIndex = RunningIndex + Memory.Length }; Next = segment;return segment; } }

为什么添加Append方法,主要原因是 ReadOnlySeq uence只支持开始和结束两个 ReadOnlySequenceSegment<T>对象,分别构成非连续内存链表的开始和结束。

ReadOnlySequenceSegment<T>只是一个单向链表内存指向,实际使用还需要一个内存结构类

classMemoryBlock : IDisposable{publicMemoryBlock(int length, int segmentMinSize) { Data = MemoryPool<byte>.Shared.Rent(length < segmentMinSize ? segmentMinSize : length); Length = Data.Memory.Length; Memory = Data.Memory; }public IMemoryOwner<byte> Data { get; set; }public Memory<byte> Memory { get; set; }publicint Length { get; set; }publicint Allocated { get; set; }publicint Postion { get; set; }public MemoryBlock Next { get; set; }publicvoidReadAdvanceTo(int count) { Postion += count; }publicvoidAdvanceTo(int count) { Allocated += count; }public Memory<byte> Allot(int size) {return Memory.Slice(Allocated, size); }publicboolTryAllot(int size) {return Memory.Length - Allocated >= size; }public Memory<byte> GetUseMemory() {return Memory.Slice(Postion, Allocated - Postion); }publicvoidDispose() { Data.Dispose(); Memory = null; Data = null; }}

同样这个内存类也要构建成一个链表,所以也定义了Next属性指向下一下内存存储类。

有了以上两个类就可以构建 ReadOnlySeq uence的适配器了,定义对应的内存分配方法

public Span<byte> GetSpan(int length) {return GetMemory(length).Span; }public Memory<byte> GetMemory(int length) {if (_end == null) { CreateMemory(length); }if (!_end.TryAllot(length)) CreateMemory(length);return _end.Allot(length); }privatevoidCreateMemory(int length) { MemoryBlock result = new MemoryBlock(length, SegmentMinSize);if (_first == null) _first = result;if (_end == null) { _end = result; }else { _end.Next = result; _end = result; } }

当内存分配后填充数据完成就需要提交对应写入数据的长度

publicvoidWriteAdvanceTo(int length) { _end.AdvanceTo(length); }

数据写入完成后就可以提交数据并构建对应可读的 ReadOnlySeq uence了

public void Flush(){var start = _first.GetUseMemory();if (_first == _end) {_readOnlySequence = new ReadOnlySequence<byte>(start); }else {MemorySegment first = new MemorySegment(start);MemoryBlocklast = _first.Next;MemorySegment next = first;while (last != null) {next = next.Append(last.GetUseMemory());last = last.Next; }_readOnlySequence = new ReadOnlySequence<byte>(first, 0, next, next.Memory.Length); }}

写入的功能已经完成,接下来就是读取提交的代码

publicvoidReaderAdvanceTo(long length){ _readOnlySequence = _readOnlySequence.Slice(length);while (length > 0) {var memory = _first;var len = memory.Allocated - memory.Postion;if (length >= len) { length -= len; memory.Dispose(); _first = memory.Next;if (_first == null) { _end = null;return; } }else { memory.ReadAdvanceTo((int)length); length = 0; } }}

这一环节最重要的工作就是检查每个内存块是否已经读取完成,如果完成就把内存回收到池里。接下来再实现一个相应的Stream方便兼容第三方使用

  • public classReadOnlySequenceAdapterStream : Stream, ISpanSequenceNetStream{publicReadOnlySequenceAdapterStream() {this.ReadOnlySequenceAdapter = new ReadOnlySequenceAdapter();this.ReadOnlySequenceAdapter.SegmentMinSize = 1024 * 4; }public ReadOnlySequenceAdapter ReadOnlySequenceAdapter { get; privateset; }publicoverridebool CanRead => true;publicoverridebool CanSeek => thrownew NotImplementedException();publicoverridebool CanWrite => thrownew NotImplementedException();publicoverridelong Length => ReadOnlySequenceAdapter.ReadOnlySequence.Length;publicoverridelong Position { get => thrownew NotImplementedException(); set => thrownew NotImplementedException(); }public Span<byte> Allot(int count) {var result = ReadOnlySequenceAdapter.GetSpan(count); WriteAdvance(4);return result; }publicoverridevoidFlush() {this.ReadOnlySequenceAdapter.Flush(); }public ReadOnlySequence<byte> GetReadOnlySequence() {return ReadOnlySequenceAdapter.ReadOnlySequence; }public Memory<byte> GetWriteMemory(int count) {return ReadOnlySequenceAdapter.GetMemory(count); }publicoverrideintReadByte() {if (Length == 0)return-1;int result = ReadOnlySequenceAdapter.ReadOnlySequence.FirstSpan[0]; ReadAdvance(1);return result; }publicoverrideintRead(byte[] buffer, int offset, int count) {if (buffer.Length == 0)return0;if (Length > count) { ReadOnlySequenceAdapter.ReadOnlySequence.CopyTo(new Span<byte>(buffer, offset, count)); ReadAdvance(count);return count; }else {var len = (int)Length; ReadOnlySequenceAdapter.ReadOnlySequence.CopyTo(new Span<byte>(buffer, offset, len)); ReadAdvance(len);return len; } }publicvoidReadAdvance(long count) {this.ReadOnlySequenceAdapter.ReaderAdvanceTo(count); }publicoverridelongSeek(long offset, SeekOrigin origin) {thrownew NotImplementedException(); }publicoverridevoidSetLength(longvalue) {thrownew NotImplementedException(); }publicoverridevoidWriteByte(bytevalue) {var memory = ReadOnlySequenceAdapter.GetMemory(1); memory.Span[0] = value; WriteAdvance(1); }publicboolTryRead(int count, out ReadOnlySequence<byte> data) { data = default;if (Length > count) { data = ReadOnlySequenceAdapter.ReadOnlySequence.Slice(0, count);returntrue; }returnfalse; }publicoverridevoidWrite(byte[] buffer, int offset, int count) {var memory = ReadOnlySequenceAdapter.GetSpan(count); Span<byte> span = new Span<byte>(buffer, offset, count); span.CopyTo(memory); WriteAdvance(count); }public Span<byte> GetWriteSpan(int count) {return ReadOnlySequenceAdapter.GetSpan(count); }privatelong _catchWriteLength = 0;publicvoidStartWriteLength() { _catchWriteLength = 0; ; }publicintEndWriteLength() {return (int)_catchWriteLength; }publicvoidWriteAdvance(int count) { _catchWriteLength += count; ReadOnlySequenceAdapter.WriteAdvanceTo(count); }protectedoverridevoidDispose(bool disposing) {base.Dispose(disposing);if (disposing) { ReadOnlySequenceAdapter.Dispose(); } }publicvoidImport(Stream stream) {var span = GetWriteSpan(1024 * 4);var len = stream.Read(span);while (len > 0) { WriteAdvance(len); len = stream.Read(span); } Flush(); }}

    这样一个标准化并支持 ReadOnlySeq uence的Stream就实现了,如果想这个类需要支持Socket和SslStream的中间层还需扩展它的异步方法,并支持Socket异步读写, SslStream握手验证的时候需要使用到Stream的异步方法。

    BeetleX

    开源跨平台通讯框架(支持TLS)

    提供HTTP,Websocket,MQTT,Redis,RPC和服务网关开源组件

    个人微信:henryfan128 QQ:28304340

    关注公众号

    https://github.com/beetlex-io/