using System.Buffers; using BufferOwner = System.Buffers.IMemoryOwner; namespace System.Net.Sockets.Kcp { public class Kcp : KcpCore where Segment : IKcpSegment { //extension 重构和新增加的部分============================================ IRentable rentable; /// /// create a new kcp control object, 'conv' must equal in two endpoint /// from the same connection. /// /// /// /// 可租用内存的回调 public Kcp(uint conv_, IKcpCallback callback, IRentable rentable = null) : base(conv_) { callbackHandle = callback; this.rentable = rentable; } /// /// 如果外部能够提供缓冲区则使用外部缓冲区,否则new byte[] /// /// /// internal protected override BufferOwner CreateBuffer(int needSize) { var res = rentable?.RentBuffer(needSize); if (res == null) { return base.CreateBuffer(needSize); } else { if (res.Memory.Length < needSize) { throw new ArgumentException($"{nameof(rentable.RentBuffer)} 指定的委托不符合标准,返回的" + $"BufferOwner.Memory.Length 小于 {nameof(needSize)}"); } } return res; } /// /// TryRecv Recv设计上同一时刻只允许一个线程调用。 /// 因为要保证数据顺序,多个线程同时调用Recv也没有意义。 /// 所以只需要部分加锁即可。 /// /// public (BufferOwner buffer, int avalidLength) TryRecv() { var peekSize = -1; lock (rcv_queueLock) { if (rcv_queue.Count == 0) { ///没有可用包 return (null, -1); } var seq = rcv_queue[0]; if (seq.frg == 0) { peekSize = (int)seq.len; } if (rcv_queue.Count < seq.frg + 1) { ///没有足够的包 return (null, -1); } uint length = 0; foreach (var item in rcv_queue) { length += item.len; if (item.frg == 0) { break; } } peekSize = (int)length; if (peekSize <= 0) { return (null, -2); } } var buffer = CreateBuffer(peekSize); var recvlength = UncheckRecv(buffer.Memory.Span); return (buffer, recvlength); } /// /// TryRecv Recv设计上同一时刻只允许一个线程调用。 /// 因为要保证数据顺序,多个线程同时调用Recv也没有意义。 /// 所以只需要部分加锁即可。 /// /// /// public int TryRecv(IBufferWriter writer) { var peekSize = -1; lock (rcv_queueLock) { if (rcv_queue.Count == 0) { ///没有可用包 return -1; } var seq = rcv_queue[0]; if (seq.frg == 0) { peekSize = (int)seq.len; } if (rcv_queue.Count < seq.frg + 1) { ///没有足够的包 return -1; } uint length = 0; foreach (var item in rcv_queue) { length += item.len; if (item.frg == 0) { break; } } peekSize = (int)length; if (peekSize <= 0) { return -2; } } return UncheckRecv(writer); } /// /// user/upper level recv: returns size, returns below zero for EAGAIN /// /// /// public int Recv(Span buffer) { if (0 == rcv_queue.Count) { return -1; } var peekSize = PeekSize(); if (peekSize < 0) { return -2; } if (peekSize > buffer.Length) { return -3; } /// 拆分函数 var recvLength = UncheckRecv(buffer); return recvLength; } /// /// user/upper level recv: returns size, returns below zero for EAGAIN /// /// /// public int Recv(IBufferWriter writer) { if (0 == rcv_queue.Count) { return -1; } var peekSize = PeekSize(); if (peekSize < 0) { return -2; } //if (peekSize > buffer.Length) //{ // return -3; //} /// 拆分函数 var recvLength = UncheckRecv(writer); return recvLength; } /// /// 这个函数不检查任何参数 /// /// /// int UncheckRecv(Span buffer) { var recover = false; if (rcv_queue.Count >= rcv_wnd) { recover = true; } #region merge fragment. /// merge fragment. var recvLength = 0; lock (rcv_queueLock) { var count = 0; foreach (var seg in rcv_queue) { seg.data.CopyTo(buffer.Slice(recvLength)); recvLength += (int)seg.len; count++; int frg = seg.frg; SegmentManager.Free(seg); if (frg == 0) { break; } } if (count > 0) { rcv_queue.RemoveRange(0, count); } } #endregion Move_Rcv_buf_2_Rcv_queue(); #region fast recover /// fast recover if (rcv_queue.Count < rcv_wnd && recover) { // ready to send back IKCP_CMD_WINS in ikcp_flush // tell remote my window size probe |= IKCP_ASK_TELL; } #endregion return recvLength; } /// /// 这个函数不检查任何参数 /// /// /// int UncheckRecv(IBufferWriter writer) { var recover = false; if (rcv_queue.Count >= rcv_wnd) { recover = true; } #region merge fragment. /// merge fragment. var recvLength = 0; lock (rcv_queueLock) { var count = 0; foreach (var seg in rcv_queue) { var len = (int)seg.len; var destination = writer.GetSpan(len); seg.data.CopyTo(destination); writer.Advance(len); recvLength += len; count++; int frg = seg.frg; SegmentManager.Free(seg); if (frg == 0) { break; } } if (count > 0) { rcv_queue.RemoveRange(0, count); } } #endregion Move_Rcv_buf_2_Rcv_queue(); #region fast recover /// fast recover if (rcv_queue.Count < rcv_wnd && recover) { // ready to send back IKCP_CMD_WINS in ikcp_flush // tell remote my window size probe |= IKCP_ASK_TELL; } #endregion return recvLength; } /// /// check the size of next message in the recv queue /// /// public int PeekSize() { lock (rcv_queueLock) { if (rcv_queue.Count == 0) { ///没有可用包 return -1; } var seq = rcv_queue[0]; if (seq.frg == 0) { return (int)seq.len; } if (rcv_queue.Count < seq.frg + 1) { ///没有足够的包 return -1; } uint length = 0; foreach (var seg in rcv_queue) { length += seg.len; if (seg.frg == 0) { break; } } return (int)length; } } } }