123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- using System.Buffers;
- using System.Collections.Generic;
- using System.Threading.Tasks;
- using BufferOwner = System.Buffers.IMemoryOwner<byte>;
- namespace System.Net.Sockets.Kcp
- {
- /// <summary>
- /// <inheritdoc cref="IPipe{T}"/>
- /// <para></para>这是个简单的实现,更复杂使用微软官方实现<see cref="System.Threading.Channels.Channel.CreateBounded{T}(int)"/>
- /// </summary>
- /// <typeparam name="T"></typeparam>
- internal class QueuePipe<T> : Queue<T>
- {
- readonly object _innerLock = new object();
- private TaskCompletionSource<T> source;
- //线程同步上下文由Task机制保证,无需额外处理
- //SynchronizationContext callbackContext;
- //public bool UseSynchronizationContext { get; set; } = true;
- public virtual void Write(T item)
- {
- lock (_innerLock)
- {
- if (source == null)
- {
- Enqueue(item);
- }
- else
- {
- if (Count > 0)
- {
- throw new Exception("内部顺序错误,不应该出现,请联系作者");
- }
- var next = source;
- source = null;
- next.TrySetResult(item);
- }
- }
- }
- public new void Enqueue(T item)
- {
- lock (_innerLock)
- {
- base.Enqueue(item);
- }
- }
- public void Flush()
- {
- lock (_innerLock)
- {
- if (Count > 0)
- {
- var res = Dequeue();
- var next = source;
- source = null;
- next?.TrySetResult(res);
- }
- }
- }
- public virtual Task<T> ReadAsync()
- {
- lock (_innerLock)
- {
- if (this.Count > 0)
- {
- var next = Dequeue();
- return Task.FromResult(next);
- }
- else
- {
- source = new TaskCompletionSource<T>();
- return source.Task;
- }
- }
- }
- public ValueTask<T> ReadValueTaskAsync()
- {
- throw new NotImplementedException();
- }
- }
- public class KcpIO<Segment> : KcpCore<Segment>, IKcpIO
- where Segment : IKcpSegment
- {
- OutputQ outq;
- QueuePipe<ArraySegment<Segment>> recvSignal = new QueuePipe<ArraySegment<Segment>>();
- public KcpIO(uint conv_) : base(conv_)
- {
- outq = new OutputQ();
- callbackHandle = outq;
- }
- public async ValueTask RecvAsync(IBufferWriter<byte> writer, object options = null)
- {
- var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);
- for (int i = arraySegment.Offset; i < arraySegment.Count; i++)
- {
- WriteRecv(writer, arraySegment.Array[i]);
- }
- ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);
- }
- public async ValueTask<int> RecvAsync(ArraySegment<byte> buffer, object options = null)
- {
- var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);
- int start = buffer.Offset;
- for (int i = arraySegment.Offset; i < arraySegment.Count; i++)
- {
- var target = new Memory<byte>(buffer.Array, start, buffer.Array.Length - start);
- var seg = arraySegment.Array[i];
- seg.data.CopyTo(target.Span);
- start += seg.data.Length;
- SegmentManager.Free(seg);
- }
- ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);
- return start - buffer.Offset;
- }
- public async ValueTask OutputAsync(IBufferWriter<byte> writer, object options = null)
- {
- var (Owner, Count) = await outq.ReadAsync().ConfigureAwait(false);
- WriteOut(writer, Owner, Count);
- }
- internal override void Parse_data(Segment newseg)
- {
- base.Parse_data(newseg);
- lock (rcv_queueLock)
- {
- var recover = false;
- if (rcv_queue.Count >= rcv_wnd)
- {
- recover = true;
- }
- while (TryRecv(out var arraySegment) > 0)
- {
- recvSignal.Enqueue(arraySegment);
- }
- recvSignal.Flush();
- #region fast recover
- /// fast recover
- if (rcv_queue.Count < rcv_wnd && recover)
- {
- // ready to send back IKCP_CMD_WINS in ikcp_flush
- // tell remote my window size
- probe |= IKCP_ASK_TELL;
- }
- #endregion
- }
- }
- internal int TryRecv(out ArraySegment<Segment> package)
- {
- package = default;
- lock (rcv_queueLock)
- {
- var peekSize = -1;
- if (rcv_queue.Count == 0)
- {
- ///没有可用包
- return -1;
- }
- var seq = rcv_queue[0];
- if (seq.frg == 0)
- {
- peekSize = (int)seq.len;
- }
- if (rcv_queue.Count < seq.frg + 1)
- {
- ///没有足够的包
- return -1;
- }
- uint length = 0;
- Segment[] kcpSegments = ArrayPool<Segment>.Shared.Rent(seq.frg + 1);
- var index = 0;
- foreach (var item in rcv_queue)
- {
- kcpSegments[index] = item;
- index++;
- length += item.len;
- if (item.frg == 0)
- {
- break;
- }
- }
- if (index > 0)
- {
- rcv_queue.RemoveRange(0, index);
- }
- package = new ArraySegment<Segment>(kcpSegments, 0, index);
- peekSize = (int)length;
- if (peekSize <= 0)
- {
- return -2;
- }
- return peekSize;
- }
- }
- private void WriteRecv(IBufferWriter<byte> writer, Segment seg)
- {
- var curCount = (int)seg.len;
- var target = writer.GetSpan(curCount);
- seg.data.CopyTo(target);
- SegmentManager.Free(seg);
- writer.Advance(curCount);
- }
- private static void WriteOut(IBufferWriter<byte> writer, BufferOwner Owner, int Count)
- {
- var target = writer.GetSpan(Count);
- Owner.Memory.Span.Slice(0, Count).CopyTo(target);
- writer.Advance(Count);
- Owner.Dispose();
- }
- protected internal override BufferOwner CreateBuffer(int needSize)
- {
- return MemoryPool<byte>.Shared.Rent(needSize);
- }
- internal class OutputQ : QueuePipe<(BufferOwner Owner, int Count)>,
- IKcpCallback
- {
- public void Output(BufferOwner buffer, int avalidLength)
- {
- Write((buffer, avalidLength));
- }
- }
- }
- }
|