| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 | using System.Buffers;using System.Collections.Generic;using System.Threading.Tasks;using BufferOwner = System.Buffers.IMemoryOwner<byte>;namespace System.Net.Sockets.Kcp{    /// <summary>    /// <inheritdoc cref="IPipe{T}"/>    /// <para></para>这是个简单的实现,更复杂使用微软官方实现<see cref="System.Threading.Channels.Channel.CreateBounded{T}(int)"/>    /// </summary>    /// <typeparam name="T"></typeparam>    internal class QueuePipe<T> : Queue<T>    {        readonly object _innerLock = new object();        private TaskCompletionSource<T> source;        //线程同步上下文由Task机制保证,无需额外处理        //SynchronizationContext callbackContext;        //public bool UseSynchronizationContext { get; set; } = true;        public virtual void Write(T item)        {            lock (_innerLock)            {                if (source == null)                {                    Enqueue(item);                }                else                {                    if (Count > 0)                    {                        throw new Exception("内部顺序错误,不应该出现,请联系作者");                    }                    var next = source;                    source = null;                    next.TrySetResult(item);                }            }        }        public new void Enqueue(T item)        {            lock (_innerLock)            {                base.Enqueue(item);            }        }        public void Flush()        {            lock (_innerLock)            {                if (Count > 0)                {                    var res = Dequeue();                    var next = source;                    source = null;                    next?.TrySetResult(res);                }            }        }        public virtual Task<T> ReadAsync()        {            lock (_innerLock)            {                if (this.Count > 0)                {                    var next = Dequeue();                    return Task.FromResult(next);                }                else                {                    source = new TaskCompletionSource<T>();                    return source.Task;                }            }        }        public ValueTask<T> ReadValueTaskAsync()        {            throw new NotImplementedException();        }    }    public class KcpIO<Segment> : KcpCore<Segment>, IKcpIO        where Segment : IKcpSegment    {        OutputQ outq;        QueuePipe<ArraySegment<Segment>> recvSignal = new QueuePipe<ArraySegment<Segment>>();        public KcpIO(uint conv_) : base(conv_)        {            outq = new OutputQ();            callbackHandle = outq;        }        public async ValueTask RecvAsync(IBufferWriter<byte> writer, object options = null)        {            var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);            for (int i = arraySegment.Offset; i < arraySegment.Count; i++)            {                WriteRecv(writer, arraySegment.Array[i]);            }            ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);        }        public async ValueTask<int> RecvAsync(ArraySegment<byte> buffer, object options = null)        {            var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);            int start = buffer.Offset;            for (int i = arraySegment.Offset; i < arraySegment.Count; i++)            {                var target = new Memory<byte>(buffer.Array, start, buffer.Array.Length - start);                var seg = arraySegment.Array[i];                seg.data.CopyTo(target.Span);                start += seg.data.Length;                SegmentManager.Free(seg);            }            ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);            return start - buffer.Offset;        }        public async ValueTask OutputAsync(IBufferWriter<byte> writer, object options = null)        {            var (Owner, Count) = await outq.ReadAsync().ConfigureAwait(false);            WriteOut(writer, Owner, Count);        }        internal override void Parse_data(Segment newseg)        {            base.Parse_data(newseg);            lock (rcv_queueLock)            {                var recover = false;                if (rcv_queue.Count >= rcv_wnd)                {                    recover = true;                }                while (TryRecv(out var arraySegment) > 0)                {                    recvSignal.Enqueue(arraySegment);                }                recvSignal.Flush();                #region fast recover                /// fast recover                if (rcv_queue.Count < rcv_wnd && recover)                {                    // ready to send back IKCP_CMD_WINS in ikcp_flush                    // tell remote my window size                    probe |= IKCP_ASK_TELL;                }                #endregion            }        }        internal int TryRecv(out ArraySegment<Segment> package)        {            package = default;            lock (rcv_queueLock)            {                var peekSize = -1;                if (rcv_queue.Count == 0)                {                    ///没有可用包                    return -1;                }                var seq = rcv_queue[0];                if (seq.frg == 0)                {                    peekSize = (int)seq.len;                }                if (rcv_queue.Count < seq.frg + 1)                {                    ///没有足够的包                    return -1;                }                uint length = 0;                Segment[] kcpSegments = ArrayPool<Segment>.Shared.Rent(seq.frg + 1);                var index = 0;                foreach (var item in rcv_queue)                {                    kcpSegments[index] = item;                    index++;                    length += item.len;                    if (item.frg == 0)                    {                        break;                    }                }                if (index > 0)                {                    rcv_queue.RemoveRange(0, index);                }                package = new ArraySegment<Segment>(kcpSegments, 0, index);                peekSize = (int)length;                if (peekSize <= 0)                {                    return -2;                }                return peekSize;            }        }        private void WriteRecv(IBufferWriter<byte> writer, Segment seg)        {            var curCount = (int)seg.len;            var target = writer.GetSpan(curCount);            seg.data.CopyTo(target);            SegmentManager.Free(seg);            writer.Advance(curCount);        }        private static void WriteOut(IBufferWriter<byte> writer, BufferOwner Owner, int Count)        {            var target = writer.GetSpan(Count);            Owner.Memory.Span.Slice(0, Count).CopyTo(target);            writer.Advance(Count);            Owner.Dispose();        }        protected internal override BufferOwner CreateBuffer(int needSize)        {            return MemoryPool<byte>.Shared.Rent(needSize);        }        internal class OutputQ : QueuePipe<(BufferOwner Owner, int Count)>,            IKcpCallback        {            public void Output(BufferOwner buffer, int avalidLength)            {                Write((buffer, avalidLength));            }        }    }}
 |