在网络通讯中很少会针对Socket设置大的Buffer,毕竟这样做非常浪费内存;最重要是这个最大值很难去评估的, 如果为每个连接分配1MB或更大的空间当1万个连接那需要非常大的内存,实际应用中可能对对更大的消息!所以应用中可以把消息写入多个非连续的Socket Buffer就显得非常重要了。接下来就介绍写入大String来讲解实现的过程。
应用中很难评估一个String的大小,在编码的时候一般会根据String.Length*6来分配一个byte[]用来处理编码(这个6可以根据具体编码来定,这里定义6是确保的所有情况都能满足需求,实际上Encoding类有计算,只是过程处理复杂度感觉过高所以直接定最大编码值了),然后把String编码到对应的byte[]再分批次copy的Socket buffer中发送出去,这种做法就是使用一个大的连续内存块写并复制。而接下来介绍的是直接把String编码到多个非连续的Socket buffer块中然后直接发送,这样就可以节省大块内存的复制来提高效率了。
下面通过代码来介绍BeetleX是怎处理这个过程的
public int WriteString(string value, Encoding coding = )
{
if (!string.IsOrEmpty(value))
{
coding = coding ?? Encoding.UTF8;
if (WriteSequenceNetStream != )
{
ReadOnlySpan<char> spanValue = value;
int result = 0;
while (spanValue.Length > 0)
{
ReadOnlySpan<char> chars;
Span<byte> span;
span = WriteSequenceNetStream.GetWriteSpan(2048);
int encodeLen = span.Length / 6;
chars = spanValue.Slice(0, spanValue.Length > encodeLen ? encodeLen : spanValue.Length);
var len = coding.GetBytes(chars, span);
WriteSequenceNetStream.WriteAdvance(len);
result += len;
spanValue = spanValue.Slice(len);
}
return result;
}
else
{
var len = Stream.Write(value, coding);
return len;
}
}
return 0;
}
具体代码是循环分批写入,先向数据流申请2K的空间(实际情况有可能没有2K,后面会贴相关代码),然后根据分配的内存长度再计算出需要编码的字符长度然后分批写入;写入后通过WriteAdvance(len)提交实际写入块的长度。接下来看一下这个预分本的存是怎样操作的
public Memory
GetMemory(int length) {
if (_end == )
{
CreateMemory(length);
}
int availableSize = _end.AvailableSize;
if (availableSize < length && availableSize < 256)
CreateMemory(length);
return _end.Allot(length);
}
在非连续内存块的链表分配指定长度的内存,当前块可用空间不够并少于256个字节空间就重新创建一个新的内存块,从分配策略上来说并不是每一块都能完全写满,但这个并不重要因为每个内存块都有记录位置偏移,在Socket发送的时候直接针对偏移发就好了。当数据写入到多个小内存块后就可以把这个链表递归发送出去了
internal async Task SendToSocket(MemoryBlock segment, bool begin)
{
if (segment == )
return;
try
{
var buffer = segment.GetUseMemory();
if (buffer.Length != 0)
{
var len = await Socket.SendAsync(buffer);
SocketProcessHandler?.SendCompeted(this, buffer, len);
SencCompleted(len);
GetLoger(LogLevel.Debug)?.Write(this, "NetClient", "SendData", $"Length {len}");
GetLoger(LogLevel.Trace)?.Write(this, "NetClient", " SendData", $"{Convert.ToHexString(buffer.Slice(0, len).Span)}");
if (len != buffer.Length)
GetLoger(LogLevel.Error)?.Write(this, "NetClient", "SendData", $"Buffer length {buffer.Length} completed {len}");
await SendToSocket(segment.Next, false);
}
}
catch (Exception e_)
{
GetLoger(Logs.LogLevel.Error)?.WriteException(this, "NetClient", "SendData", e_);
Disconnect(e_);
}
finally
{
if (begin)
{
while (segment != )
{
var next = segment.Next;
segment.Dispose();
segment = next;
}
}
}
}
发送完成后把链表的内存回归到内存池中。
通过以上方法就可以实现一个基于动态非连续内存块的Stream,它的好处是无论怎么扩容都不会产生内存复制,内存通过内存池管理也不怕创建和扩展导致新内存开销!其实这个Stream在BeetleX源码中已经实现并支持SslStream.具体可以去查看项目源码
BeetleX
开源跨平台通讯框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服务网关开源组件
个人微信:henryfan128 QQ:28304340
https://github.com/beetlex-io/