| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 | using System.Buffers;using BufferOwner = System.Buffers.IMemoryOwner<byte>;namespace System.Net.Sockets.Kcp{    public class Kcp<Segment> : KcpCore<Segment>        where Segment : IKcpSegment    {        //extension 重构和新增加的部分============================================        IRentable rentable;        /// <summary>        /// create a new kcp control object, 'conv' must equal in two endpoint        /// from the same connection.        /// </summary>        /// <param name="conv_"></param>        /// <param name="callback"></param>        /// <param name="rentable">可租用内存的回调</param>        public Kcp(uint conv_, IKcpCallback callback, IRentable rentable = null)            : base(conv_)        {            callbackHandle = callback;            this.rentable = rentable;        }        /// <summary>        /// 如果外部能够提供缓冲区则使用外部缓冲区,否则new byte[]        /// </summary>        /// <param name="needSize"></param>        /// <returns></returns>        internal protected override BufferOwner CreateBuffer(int needSize)        {            var res = rentable?.RentBuffer(needSize);            if (res == null)            {                return base.CreateBuffer(needSize);            }            else            {                if (res.Memory.Length < needSize)                {                    throw new ArgumentException($"{nameof(rentable.RentBuffer)} 指定的委托不符合标准,返回的" +                                                $"BufferOwner.Memory.Length 小于 {nameof(needSize)}");                }            }            return res;        }        /// <summary>        /// TryRecv Recv设计上同一时刻只允许一个线程调用。        /// <para/>因为要保证数据顺序,多个线程同时调用Recv也没有意义。        /// <para/>所以只需要部分加锁即可。        /// </summary>        /// <returns></returns>        public (BufferOwner buffer, int avalidLength) TryRecv()        {            var peekSize = -1;            lock (rcv_queueLock)            {                if (rcv_queue.Count == 0)                {                    ///没有可用包                    return (null, -1);                }                var seq = rcv_queue[0];                if (seq.frg == 0)                {                    peekSize = (int)seq.len;                }                if (rcv_queue.Count < seq.frg + 1)                {                    ///没有足够的包                    return (null, -1);                }                uint length = 0;                foreach (var item in rcv_queue)                {                    length += item.len;                    if (item.frg == 0)                    {                        break;                    }                }                peekSize = (int)length;                if (peekSize <= 0)                {                    return (null, -2);                }            }            var buffer = CreateBuffer(peekSize);            var recvlength = UncheckRecv(buffer.Memory.Span);            return (buffer, recvlength);        }        /// <summary>        /// TryRecv Recv设计上同一时刻只允许一个线程调用。        /// <para/>因为要保证数据顺序,多个线程同时调用Recv也没有意义。        /// <para/>所以只需要部分加锁即可。        /// </summary>        /// <param name="writer"></param>        /// <returns></returns>        public int TryRecv(IBufferWriter<byte> writer)        {            var peekSize = -1;            lock (rcv_queueLock)            {                if (rcv_queue.Count == 0)                {                    ///没有可用包                    return -1;                }                var seq = rcv_queue[0];                if (seq.frg == 0)                {                    peekSize = (int)seq.len;                }                if (rcv_queue.Count < seq.frg + 1)                {                    ///没有足够的包                    return -1;                }                uint length = 0;                foreach (var item in rcv_queue)                {                    length += item.len;                    if (item.frg == 0)                    {                        break;                    }                }                peekSize = (int)length;                if (peekSize <= 0)                {                    return -2;                }            }            return UncheckRecv(writer);        }        /// <summary>        /// user/upper level recv: returns size, returns below zero for EAGAIN        /// </summary>        /// <param name="buffer"></param>        /// <returns></returns>        public int Recv(Span<byte> buffer)        {            if (0 == rcv_queue.Count)            {                return -1;            }            var peekSize = PeekSize();            if (peekSize < 0)            {                return -2;            }            if (peekSize > buffer.Length)            {                return -3;            }            /// 拆分函数            var recvLength = UncheckRecv(buffer);            return recvLength;        }        /// <summary>        /// user/upper level recv: returns size, returns below zero for EAGAIN        /// </summary>        /// <param name="writer"></param>        /// <returns></returns>        public int Recv(IBufferWriter<byte> writer)        {            if (0 == rcv_queue.Count)            {                return -1;            }            var peekSize = PeekSize();            if (peekSize < 0)            {                return -2;            }            //if (peekSize > buffer.Length)            //{            //    return -3;            //}            /// 拆分函数            var recvLength = UncheckRecv(writer);            return recvLength;        }        /// <summary>        /// 这个函数不检查任何参数        /// </summary>        /// <param name="buffer"></param>        /// <returns></returns>        int UncheckRecv(Span<byte> buffer)        {            var recover = false;            if (rcv_queue.Count >= rcv_wnd)            {                recover = true;            }            #region merge fragment.            /// merge fragment.            var recvLength = 0;            lock (rcv_queueLock)            {                var count = 0;                foreach (var seg in rcv_queue)                {                    seg.data.CopyTo(buffer.Slice(recvLength));                    recvLength += (int)seg.len;                    count++;                    int frg = seg.frg;                    SegmentManager.Free(seg);                    if (frg == 0)                    {                        break;                    }                }                if (count > 0)                {                    rcv_queue.RemoveRange(0, count);                }            }            #endregion            Move_Rcv_buf_2_Rcv_queue();            #region fast recover            /// fast recover            if (rcv_queue.Count < rcv_wnd && recover)            {                // ready to send back IKCP_CMD_WINS in ikcp_flush                // tell remote my window size                probe |= IKCP_ASK_TELL;            }            #endregion            return recvLength;        }        /// <summary>        /// 这个函数不检查任何参数        /// </summary>        /// <param name="writer"></param>        /// <returns></returns>        int UncheckRecv(IBufferWriter<byte> writer)        {            var recover = false;            if (rcv_queue.Count >= rcv_wnd)            {                recover = true;            }            #region merge fragment.            /// merge fragment.            var recvLength = 0;            lock (rcv_queueLock)            {                var count = 0;                foreach (var seg in rcv_queue)                {                    var len = (int)seg.len;                    var destination = writer.GetSpan(len);                    seg.data.CopyTo(destination);                    writer.Advance(len);                    recvLength += len;                    count++;                    int frg = seg.frg;                    SegmentManager.Free(seg);                    if (frg == 0)                    {                        break;                    }                }                if (count > 0)                {                    rcv_queue.RemoveRange(0, count);                }            }            #endregion            Move_Rcv_buf_2_Rcv_queue();            #region fast recover            /// fast recover            if (rcv_queue.Count < rcv_wnd && recover)            {                // ready to send back IKCP_CMD_WINS in ikcp_flush                // tell remote my window size                probe |= IKCP_ASK_TELL;            }            #endregion            return recvLength;        }        /// <summary>        /// check the size of next message in the recv queue        /// </summary>        /// <returns></returns>        public int PeekSize()        {            lock (rcv_queueLock)            {                if (rcv_queue.Count == 0)                {                    ///没有可用包                    return -1;                }                var seq = rcv_queue[0];                if (seq.frg == 0)                {                    return (int)seq.len;                }                if (rcv_queue.Count < seq.frg + 1)                {                    ///没有足够的包                    return -1;                }                uint length = 0;                foreach (var seg in rcv_queue)                {                    length += seg.len;                    if (seg.frg == 0)                    {                        break;                    }                }                return (int)length;            }        }    }}
 |