| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 | 
							- using System.Buffers;
 
- using System.Collections.Generic;
 
- using System.Threading.Tasks;
 
- using BufferOwner = System.Buffers.IMemoryOwner<byte>;
 
- namespace System.Net.Sockets.Kcp
 
- {
 
-     /// <summary>
 
-     /// <inheritdoc cref="IPipe{T}"/>
 
-     /// <para></para>这是个简单的实现,更复杂使用微软官方实现<see cref="System.Threading.Channels.Channel.CreateBounded{T}(int)"/>
 
-     /// </summary>
 
-     /// <typeparam name="T"></typeparam>
 
-     internal class QueuePipe<T> : Queue<T>
 
-     {
 
-         readonly object _innerLock = new object();
 
-         private TaskCompletionSource<T> source;
 
-         //线程同步上下文由Task机制保证,无需额外处理
 
-         //SynchronizationContext callbackContext;
 
-         //public bool UseSynchronizationContext { get; set; } = true;
 
-         public virtual void Write(T item)
 
-         {
 
-             lock (_innerLock)
 
-             {
 
-                 if (source == null)
 
-                 {
 
-                     Enqueue(item);
 
-                 }
 
-                 else
 
-                 {
 
-                     if (Count > 0)
 
-                     {
 
-                         throw new Exception("内部顺序错误,不应该出现,请联系作者");
 
-                     }
 
-                     var next = source;
 
-                     source = null;
 
-                     next.TrySetResult(item);
 
-                 }
 
-             }
 
-         }
 
-         public new void Enqueue(T item)
 
-         {
 
-             lock (_innerLock)
 
-             {
 
-                 base.Enqueue(item);
 
-             }
 
-         }
 
-         public void Flush()
 
-         {
 
-             lock (_innerLock)
 
-             {
 
-                 if (Count > 0)
 
-                 {
 
-                     var res = Dequeue();
 
-                     var next = source;
 
-                     source = null;
 
-                     next?.TrySetResult(res);
 
-                 }
 
-             }
 
-         }
 
-         public virtual Task<T> ReadAsync()
 
-         {
 
-             lock (_innerLock)
 
-             {
 
-                 if (this.Count > 0)
 
-                 {
 
-                     var next = Dequeue();
 
-                     return Task.FromResult(next);
 
-                 }
 
-                 else
 
-                 {
 
-                     source = new TaskCompletionSource<T>();
 
-                     return source.Task;
 
-                 }
 
-             }
 
-         }
 
-         public ValueTask<T> ReadValueTaskAsync()
 
-         {
 
-             throw new NotImplementedException();
 
-         }
 
-     }
 
-     public class KcpIO<Segment> : KcpCore<Segment>, IKcpIO
 
-         where Segment : IKcpSegment
 
-     {
 
-         OutputQ outq;
 
-         QueuePipe<ArraySegment<Segment>> recvSignal = new QueuePipe<ArraySegment<Segment>>();
 
-         public KcpIO(uint conv_) : base(conv_)
 
-         {
 
-             outq = new OutputQ();
 
-             callbackHandle = outq;
 
-         }
 
-         public async ValueTask RecvAsync(IBufferWriter<byte> writer, object options = null)
 
-         {
 
-             var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);
 
-             for (int i = arraySegment.Offset; i < arraySegment.Count; i++)
 
-             {
 
-                 WriteRecv(writer, arraySegment.Array[i]);
 
-             }
 
-             ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);
 
-         }
 
-         public async ValueTask<int> RecvAsync(ArraySegment<byte> buffer, object options = null)
 
-         {
 
-             var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);
 
-             int start = buffer.Offset;
 
-             for (int i = arraySegment.Offset; i < arraySegment.Count; i++)
 
-             {
 
-                 var target = new Memory<byte>(buffer.Array, start, buffer.Array.Length - start);
 
-                 var seg = arraySegment.Array[i];
 
-                 seg.data.CopyTo(target.Span);
 
-                 start += seg.data.Length;
 
-                 SegmentManager.Free(seg);
 
-             }
 
-             ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);
 
-             return start - buffer.Offset;
 
-         }
 
-         public async ValueTask OutputAsync(IBufferWriter<byte> writer, object options = null)
 
-         {
 
-             var (Owner, Count) = await outq.ReadAsync().ConfigureAwait(false);
 
-             WriteOut(writer, Owner, Count);
 
-         }
 
-         internal override void Parse_data(Segment newseg)
 
-         {
 
-             base.Parse_data(newseg);
 
-             lock (rcv_queueLock)
 
-             {
 
-                 var recover = false;
 
-                 if (rcv_queue.Count >= rcv_wnd)
 
-                 {
 
-                     recover = true;
 
-                 }
 
-                 while (TryRecv(out var arraySegment) > 0)
 
-                 {
 
-                     recvSignal.Enqueue(arraySegment);
 
-                 }
 
-                 recvSignal.Flush();
 
-                 #region fast recover
 
-                 /// fast recover
 
-                 if (rcv_queue.Count < rcv_wnd && recover)
 
-                 {
 
-                     // ready to send back IKCP_CMD_WINS in ikcp_flush
 
-                     // tell remote my window size
 
-                     probe |= IKCP_ASK_TELL;
 
-                 }
 
-                 #endregion
 
-             }
 
-         }
 
-         internal int TryRecv(out ArraySegment<Segment> package)
 
-         {
 
-             package = default;
 
-             lock (rcv_queueLock)
 
-             {
 
-                 var peekSize = -1;
 
-                 if (rcv_queue.Count == 0)
 
-                 {
 
-                     ///没有可用包
 
-                     return -1;
 
-                 }
 
-                 var seq = rcv_queue[0];
 
-                 if (seq.frg == 0)
 
-                 {
 
-                     peekSize = (int)seq.len;
 
-                 }
 
-                 if (rcv_queue.Count < seq.frg + 1)
 
-                 {
 
-                     ///没有足够的包
 
-                     return -1;
 
-                 }
 
-                 uint length = 0;
 
-                 Segment[] kcpSegments = ArrayPool<Segment>.Shared.Rent(seq.frg + 1);
 
-                 var index = 0;
 
-                 foreach (var item in rcv_queue)
 
-                 {
 
-                     kcpSegments[index] = item;
 
-                     index++;
 
-                     length += item.len;
 
-                     if (item.frg == 0)
 
-                     {
 
-                         break;
 
-                     }
 
-                 }
 
-                 if (index > 0)
 
-                 {
 
-                     rcv_queue.RemoveRange(0, index);
 
-                 }
 
-                 package = new ArraySegment<Segment>(kcpSegments, 0, index);
 
-                 peekSize = (int)length;
 
-                 if (peekSize <= 0)
 
-                 {
 
-                     return -2;
 
-                 }
 
-                 return peekSize;
 
-             }
 
-         }
 
-         private void WriteRecv(IBufferWriter<byte> writer, Segment seg)
 
-         {
 
-             var curCount = (int)seg.len;
 
-             var target = writer.GetSpan(curCount);
 
-             seg.data.CopyTo(target);
 
-             SegmentManager.Free(seg);
 
-             writer.Advance(curCount);
 
-         }
 
-         private static void WriteOut(IBufferWriter<byte> writer, BufferOwner Owner, int Count)
 
-         {
 
-             var target = writer.GetSpan(Count);
 
-             Owner.Memory.Span.Slice(0, Count).CopyTo(target);
 
-             writer.Advance(Count);
 
-             Owner.Dispose();
 
-         }
 
-         protected internal override BufferOwner CreateBuffer(int needSize)
 
-         {
 
-             return MemoryPool<byte>.Shared.Rent(needSize);
 
-         }
 
-         internal class OutputQ : QueuePipe<(BufferOwner Owner, int Count)>,
 
-             IKcpCallback
 
-         {
 
-             public void Output(BufferOwner buffer, int avalidLength)
 
-             {
 
-                 Write((buffer, avalidLength));
 
-             }
 
-         }
 
-     }
 
- }
 
 
  |