KcpIO.cs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. using System.Buffers;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using BufferOwner = System.Buffers.IMemoryOwner<byte>;
  5. namespace System.Net.Sockets.Kcp
  6. {
  7. /// <summary>
  8. /// <inheritdoc cref="IPipe{T}"/>
  9. /// <para></para>这是个简单的实现,更复杂使用微软官方实现<see cref="System.Threading.Channels.Channel.CreateBounded{T}(int)"/>
  10. /// </summary>
  11. /// <typeparam name="T"></typeparam>
  12. internal class QueuePipe<T> : Queue<T>
  13. {
  14. readonly object _innerLock = new object();
  15. private TaskCompletionSource<T> source;
  16. //线程同步上下文由Task机制保证,无需额外处理
  17. //SynchronizationContext callbackContext;
  18. //public bool UseSynchronizationContext { get; set; } = true;
  19. public virtual void Write(T item)
  20. {
  21. lock (_innerLock)
  22. {
  23. if (source == null)
  24. {
  25. Enqueue(item);
  26. }
  27. else
  28. {
  29. if (Count > 0)
  30. {
  31. throw new Exception("内部顺序错误,不应该出现,请联系作者");
  32. }
  33. var next = source;
  34. source = null;
  35. next.TrySetResult(item);
  36. }
  37. }
  38. }
  39. public new void Enqueue(T item)
  40. {
  41. lock (_innerLock)
  42. {
  43. base.Enqueue(item);
  44. }
  45. }
  46. public void Flush()
  47. {
  48. lock (_innerLock)
  49. {
  50. if (Count > 0)
  51. {
  52. var res = Dequeue();
  53. var next = source;
  54. source = null;
  55. next?.TrySetResult(res);
  56. }
  57. }
  58. }
  59. public virtual Task<T> ReadAsync()
  60. {
  61. lock (_innerLock)
  62. {
  63. if (this.Count > 0)
  64. {
  65. var next = Dequeue();
  66. return Task.FromResult(next);
  67. }
  68. else
  69. {
  70. source = new TaskCompletionSource<T>();
  71. return source.Task;
  72. }
  73. }
  74. }
  75. public ValueTask<T> ReadValueTaskAsync()
  76. {
  77. throw new NotImplementedException();
  78. }
  79. }
  80. public class KcpIO<Segment> : KcpCore<Segment>, IKcpIO
  81. where Segment : IKcpSegment
  82. {
  83. OutputQ outq;
  84. QueuePipe<ArraySegment<Segment>> recvSignal = new QueuePipe<ArraySegment<Segment>>();
  85. public KcpIO(uint conv_) : base(conv_)
  86. {
  87. outq = new OutputQ();
  88. callbackHandle = outq;
  89. }
  90. public async ValueTask RecvAsync(IBufferWriter<byte> writer, object options = null)
  91. {
  92. var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);
  93. for (int i = arraySegment.Offset; i < arraySegment.Count; i++)
  94. {
  95. WriteRecv(writer, arraySegment.Array[i]);
  96. }
  97. ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);
  98. }
  99. public async ValueTask<int> RecvAsync(ArraySegment<byte> buffer, object options = null)
  100. {
  101. var arraySegment = await recvSignal.ReadAsync().ConfigureAwait(false);
  102. int start = buffer.Offset;
  103. for (int i = arraySegment.Offset; i < arraySegment.Count; i++)
  104. {
  105. var target = new Memory<byte>(buffer.Array, start, buffer.Array.Length - start);
  106. var seg = arraySegment.Array[i];
  107. seg.data.CopyTo(target.Span);
  108. start += seg.data.Length;
  109. SegmentManager.Free(seg);
  110. }
  111. ArrayPool<Segment>.Shared.Return(arraySegment.Array, true);
  112. return start - buffer.Offset;
  113. }
  114. public async ValueTask OutputAsync(IBufferWriter<byte> writer, object options = null)
  115. {
  116. var (Owner, Count) = await outq.ReadAsync().ConfigureAwait(false);
  117. WriteOut(writer, Owner, Count);
  118. }
  119. internal override void Parse_data(Segment newseg)
  120. {
  121. base.Parse_data(newseg);
  122. lock (rcv_queueLock)
  123. {
  124. var recover = false;
  125. if (rcv_queue.Count >= rcv_wnd)
  126. {
  127. recover = true;
  128. }
  129. while (TryRecv(out var arraySegment) > 0)
  130. {
  131. recvSignal.Enqueue(arraySegment);
  132. }
  133. recvSignal.Flush();
  134. #region fast recover
  135. /// fast recover
  136. if (rcv_queue.Count < rcv_wnd && recover)
  137. {
  138. // ready to send back IKCP_CMD_WINS in ikcp_flush
  139. // tell remote my window size
  140. probe |= IKCP_ASK_TELL;
  141. }
  142. #endregion
  143. }
  144. }
  145. internal int TryRecv(out ArraySegment<Segment> package)
  146. {
  147. package = default;
  148. lock (rcv_queueLock)
  149. {
  150. var peekSize = -1;
  151. if (rcv_queue.Count == 0)
  152. {
  153. ///没有可用包
  154. return -1;
  155. }
  156. var seq = rcv_queue[0];
  157. if (seq.frg == 0)
  158. {
  159. peekSize = (int)seq.len;
  160. }
  161. if (rcv_queue.Count < seq.frg + 1)
  162. {
  163. ///没有足够的包
  164. return -1;
  165. }
  166. uint length = 0;
  167. Segment[] kcpSegments = ArrayPool<Segment>.Shared.Rent(seq.frg + 1);
  168. var index = 0;
  169. foreach (var item in rcv_queue)
  170. {
  171. kcpSegments[index] = item;
  172. index++;
  173. length += item.len;
  174. if (item.frg == 0)
  175. {
  176. break;
  177. }
  178. }
  179. if (index > 0)
  180. {
  181. rcv_queue.RemoveRange(0, index);
  182. }
  183. package = new ArraySegment<Segment>(kcpSegments, 0, index);
  184. peekSize = (int)length;
  185. if (peekSize <= 0)
  186. {
  187. return -2;
  188. }
  189. return peekSize;
  190. }
  191. }
  192. private void WriteRecv(IBufferWriter<byte> writer, Segment seg)
  193. {
  194. var curCount = (int)seg.len;
  195. var target = writer.GetSpan(curCount);
  196. seg.data.CopyTo(target);
  197. SegmentManager.Free(seg);
  198. writer.Advance(curCount);
  199. }
  200. private static void WriteOut(IBufferWriter<byte> writer, BufferOwner Owner, int Count)
  201. {
  202. var target = writer.GetSpan(Count);
  203. Owner.Memory.Span.Slice(0, Count).CopyTo(target);
  204. writer.Advance(Count);
  205. Owner.Dispose();
  206. }
  207. protected internal override BufferOwner CreateBuffer(int needSize)
  208. {
  209. return MemoryPool<byte>.Shared.Rent(needSize);
  210. }
  211. internal class OutputQ : QueuePipe<(BufferOwner Owner, int Count)>,
  212. IKcpCallback
  213. {
  214. public void Output(BufferOwner buffer, int avalidLength)
  215. {
  216. Write((buffer, avalidLength));
  217. }
  218. }
  219. }
  220. }