KcpCore.cs 69 KB


  1. using System.Buffers;
  2. using System.Buffers.Binary;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Runtime.CompilerServices;
  6. using Fort23.UTool;
  7. using static System.Math;
  8. using BufferOwner = System.Buffers.IMemoryOwner<byte>;
  9. namespace System.Net.Sockets.Kcp
  10. {
  11. public abstract class KcpConst
  12. {
  13. /// <summary>
  14. /// <para>https://github.com/skywind3000/kcp/issues/53</para>
  15. /// 按照 C版 设计,使用小端字节序
  16. /// </summary>
  17. public static bool IsLittleEndian = true;
  18. // 为了减少阅读难度,变量名尽量于 C版 统一
  19. /*
  20. conv 会话ID
  21. mtu 最大传输单元
  22. mss 最大分片大小
  23. state 连接状态(0xFFFFFFFF表示断开连接)
  24. snd_una 第一个未确认的包
  25. snd_nxt 待发送包的序号
  26. rcv_nxt 待接收消息序号
  27. ssthresh 拥塞窗口阈值
  28. rx_rttvar ack接收rtt浮动值
  29. rx_srtt ack接收rtt静态值
  30. rx_rto 由ack接收延迟计算出来的复原时间
  31. rx_minrto 最小复原时间
  32. snd_wnd 发送窗口大小
  33. rcv_wnd 接收窗口大小
  34. rmt_wnd, 远端接收窗口大小
  35. cwnd, 拥塞窗口大小
  36. probe 探查变量,IKCP_ASK_TELL表示告知远端窗口大小。IKCP_ASK_SEND表示请求远端告知窗口大小
  37. interval 内部flush刷新间隔
  38. ts_flush 下次flush刷新时间戳
  39. nodelay 是否启动无延迟模式
  40. updated 是否调用过update函数的标识
  41. ts_probe, 下次探查窗口的时间戳
  42. probe_wait 探查窗口需要等待的时间
  43. dead_link 最大重传次数
  44. incr 可发送的最大数据量
  45. fastresend 触发快速重传的重复ack个数
  46. nocwnd 取消拥塞控制
  47. stream 是否采用流传输模式
  48. snd_queue 发送消息的队列
  49. rcv_queue 接收消息的队列
  50. snd_buf 发送消息的缓存
  51. rcv_buf 接收消息的缓存
  52. acklist 待发送的ack列表
  53. buffer 存储消息字节流的内存
  54. output udp发送消息的回调函数
  55. */
  56. #region Const
  57. public const int IKCP_RTO_NDL = 30; // no delay min rto
  58. public const int IKCP_RTO_MIN = 100; // normal min rto
  59. public const int IKCP_RTO_DEF = 200;
  60. public const int IKCP_RTO_MAX = 60000;
  61. /// <summary>
  62. /// 数据报文
  63. /// </summary>
  64. public const int IKCP_CMD_PUSH = 81; // cmd: push data
  65. /// <summary>
  66. /// 确认报文
  67. /// </summary>
  68. public const int IKCP_CMD_ACK = 82; // cmd: ack
  69. /// <summary>
  70. /// 窗口探测报文,询问对端剩余接收窗口的大小.
  71. /// </summary>
  72. public const int IKCP_CMD_WASK = 83; // cmd: window probe (ask)
  73. /// <summary>
  74. /// 窗口通知报文,通知对端剩余接收窗口的大小.
  75. /// </summary>
  76. public const int IKCP_CMD_WINS = 84; // cmd: window size (tell)
  77. /// <summary>
  78. /// IKCP_ASK_SEND表示请求远端告知窗口大小
  79. /// </summary>
  80. public const int IKCP_ASK_SEND = 84; // need to send IKCP_CMD_WASK
  81. /// <summary>
  82. /// IKCP_ASK_TELL表示告知远端窗口大小。
  83. /// </summary>
  84. public const int IKCP_ASK_TELL = 84; // need to send IKCP_CMD_WINS
  85. public const int IKCP_WND_SND = 256;
  86. /// <summary>
  87. /// 接收窗口默认值。必须大于最大分片数
  88. /// </summary>
  89. public const int IKCP_WND_RCV = 512; // must >= max fragment size
  90. /// <summary>
  91. /// 默认最大传输单元 常见路由值 1492 1480 默认1400保证在路由层不会被分片
  92. /// </summary>
  93. public const int IKCP_MTU_DEF = 1400;
  94. public const int IKCP_ACK_FAST = 3;
  95. public const int IKCP_INTERVAL = 5;
  96. public const int IKCP_OVERHEAD = 24;
  97. public const int IKCP_DEADLINK = 20;
  98. public const int IKCP_THRESH_INIT = 2;
  99. public const int IKCP_THRESH_MIN = 2;
  100. /// <summary>
  101. /// 窗口探查CD
  102. /// </summary>
  103. public const int IKCP_PROBE_INIT = 7000; // 7 secs to probe window size
  104. public const int IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window
  105. public const int IKCP_FASTACK_LIMIT = 5; // max times to trigger fastack
  106. #endregion
  107. }
  108. /// <summary>
  109. /// https://luyuhuang.tech/2020/12/09/kcp.html
  110. /// https://github.com/skywind3000/kcp/wiki/Network-Layer
  111. /// <para>外部buffer ----拆分拷贝----等待列表 -----移动----发送列表----拷贝----发送buffer---output</para>
  112. /// https://github.com/skywind3000/kcp/issues/118#issuecomment-338133930
  113. /// </summary>
  114. public partial class KcpCore<Segment> : KcpConst, IKcpSetting, IKcpUpdate, IDisposable
  115. where Segment : IKcpSegment
  116. {
  117. internal protected IKcpCallback callbackHandle;
  118. internal protected IKcpOutputWriter OutputWriter;
  119. public KcpCore(uint conv_)
  120. {
  121. conv = conv_;
  122. snd_wnd = IKCP_WND_SND;
  123. rcv_wnd = IKCP_WND_RCV;
  124. rmt_wnd = IKCP_WND_RCV;
  125. mtu = IKCP_MTU_DEF;
  126. mss = mtu - IKCP_OVERHEAD;
  127. buffer = CreateBuffer(BufferNeedSize);
  128. rx_rto = IKCP_RTO_DEF;
  129. rx_minrto = IKCP_RTO_MIN;
  130. interval = IKCP_INTERVAL;
  131. ts_flush = IKCP_INTERVAL;
  132. ssthresh = IKCP_THRESH_INIT;
  133. fastlimit = IKCP_FASTACK_LIMIT;
  134. dead_link = IKCP_DEADLINK;
  135. }
  136. public ISegmentManager<Segment> SegmentManager { get; set; }
  137. protected static uint Ibound(uint lower, uint middle, uint upper)
  138. {
  139. return Min(Max(lower, middle), upper);
  140. }
  141. protected static int Itimediff(uint later, uint earlier)
  142. {
  143. return ((int)(later - earlier));
  144. }
  145. internal protected virtual BufferOwner CreateBuffer(int needSize)
  146. {
  147. return new KcpInnerBuffer(needSize);
  148. }
  149. internal protected class KcpInnerBuffer : BufferOwner
  150. {
  151. private readonly Memory<byte> _memory;
  152. bool alreadyDisposed = false;
  153. public KcpInnerBuffer(int size)
  154. {
  155. _memory = new Memory<byte>(new byte[size]);
  156. }
  157. public Memory<byte> Memory
  158. {
  159. get
  160. {
  161. if (alreadyDisposed)
  162. {
  163. throw new ObjectDisposedException(nameof(KcpInnerBuffer));
  164. }
  165. return _memory;
  166. }
  167. }
  168. public void Dispose()
  169. {
  170. alreadyDisposed = true;
  171. }
  172. }
  173. #region kcp members
  174. /// <summary>
  175. /// 频道号
  176. /// </summary>
  177. public uint conv { get; protected set; }
  178. /// <summary>
  179. /// 最大传输单元(Maximum Transmission Unit,MTU)
  180. /// </summary>
  181. protected uint mtu;
  182. /// <summary>
  183. /// 缓冲区最小大小
  184. /// </summary>
  185. protected int BufferNeedSize
  186. {
  187. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  188. get { return (int)((mtu /* + IKCP_OVERHEAD*/) /** 3*/); }
  189. }
  190. /// <summary>
  191. /// 最大报文段长度
  192. /// </summary>
  193. protected uint mss;
  194. /// <summary>
  195. /// 连接状态(0xFFFFFFFF表示断开连接)
  196. /// </summary>
  197. protected int state;
  198. /// <summary>
  199. /// 第一个未确认的包
  200. /// </summary>
  201. protected uint snd_una;
  202. /// <summary>
  203. /// 待发送包的序号
  204. /// </summary>
  205. protected uint snd_nxt;
  206. /// <summary>
  207. /// 下一个等待接收消息ID,待接收消息序号
  208. /// </summary>
  209. protected uint rcv_nxt;
  210. protected uint ts_recent;
  211. protected uint ts_lastack;
  212. /// <summary>
  213. /// 拥塞窗口阈值
  214. /// </summary>
  215. protected uint ssthresh;
  216. /// <summary>
  217. /// ack接收rtt浮动值
  218. /// </summary>
  219. protected uint rx_rttval;
  220. /// <summary>
  221. /// ack接收rtt静态值
  222. /// </summary>
  223. protected uint rx_srtt;
  224. /// <summary>
  225. /// 由ack接收延迟计算出来的复原时间。Retransmission TimeOut(RTO), 超时重传时间.
  226. /// </summary>
  227. protected uint rx_rto;
  228. /// <summary>
  229. /// 最小复原时间
  230. /// </summary>
  231. protected uint rx_minrto;
  232. /// <summary>
  233. /// 发送窗口大小
  234. /// </summary>
  235. protected uint snd_wnd;
  236. /// <summary>
  237. /// 接收窗口大小
  238. /// </summary>
  239. protected uint rcv_wnd;
  240. /// <summary>
  241. /// 远端接收窗口大小
  242. /// </summary>
  243. protected uint rmt_wnd;
  244. /// <summary>
  245. /// 拥塞窗口大小
  246. /// </summary>
  247. protected uint cwnd;
  248. /// <summary>
  249. /// 探查变量,IKCP_ASK_TELL表示告知远端窗口大小。IKCP_ASK_SEND表示请求远端告知窗口大小
  250. /// </summary>
  251. protected uint probe;
  252. protected uint current;
  253. /// <summary>
  254. /// 内部flush刷新间隔
  255. /// </summary>
  256. protected uint interval;
  257. /// <summary>
  258. /// 下次flush刷新时间戳
  259. /// </summary>
  260. protected uint ts_flush;
  261. protected uint xmit;
  262. /// <summary>
  263. /// 是否启动无延迟模式
  264. /// </summary>
  265. protected uint nodelay;
  266. /// <summary>
  267. /// 是否调用过update函数的标识
  268. /// </summary>
  269. protected uint updated;
  270. /// <summary>
  271. /// 下次探查窗口的时间戳
  272. /// </summary>
  273. protected uint ts_probe;
  274. /// <summary>
  275. /// 探查窗口需要等待的时间
  276. /// </summary>
  277. protected uint probe_wait;
  278. /// <summary>
  279. /// 最大重传次数
  280. /// </summary>
  281. protected uint dead_link;
  282. /// <summary>
  283. /// 可发送的最大数据量
  284. /// </summary>
  285. protected uint incr;
  286. /// <summary>
  287. /// 触发快速重传的重复ack个数
  288. /// </summary>
  289. public int fastresend;
  290. public int fastlimit;
  291. /// <summary>
  292. /// 取消拥塞控制
  293. /// </summary>
  294. protected int nocwnd;
  295. protected int logmask;
  296. /// <summary>
  297. /// 是否采用流传输模式
  298. /// </summary>
  299. public int stream;
  300. protected BufferOwner buffer;
  301. #endregion
  302. #region 锁和容器
  303. /// <summary>
  304. /// 增加锁保证发送线程安全,否则可能导致2个消息的分片交替入队。
  305. /// <para/> 用例:普通发送和广播可能会导致多个线程同时调用Send方法。
  306. /// </summary>
  307. protected readonly object snd_queueLock = new object();
  308. protected readonly object snd_bufLock = new object();
  309. protected readonly object rcv_bufLock = new object();
  310. protected readonly object rcv_queueLock = new object();
  311. /// <summary>
  312. /// 发送 ack 队列
  313. /// </summary>
  314. protected ConcurrentQueue<(uint sn, uint ts)> acklist = new ConcurrentQueue<(uint sn, uint ts)>();
  315. /// <summary>
  316. /// 发送等待队列
  317. /// </summary>
  318. internal ConcurrentQueue<Segment> snd_queue = new ConcurrentQueue<Segment>();
  319. /// <summary>
  320. /// 正在发送列表
  321. /// </summary>
  322. internal LinkedList<Segment> snd_buf = new LinkedList<Segment>();
  323. /// <summary>
  324. /// 正在等待触发接收回调函数消息列表
  325. /// <para>需要执行的操作 添加 遍历 删除</para>
  326. /// </summary>
  327. internal List<Segment> rcv_queue = new List<Segment>();
  328. /// <summary>
  329. /// 正在等待重组消息列表
  330. /// <para>需要执行的操作 添加 插入 遍历 删除</para>
  331. /// </summary>
  332. internal LinkedList<Segment> rcv_buf = new LinkedList<Segment>();
  333. /// <summary>
  334. /// get how many packet is waiting to be sent
  335. /// </summary>
  336. /// <returns></returns>
  337. public int WaitSnd => snd_buf.Count + snd_queue.Count;
  338. #endregion
  339. #region IDisposable Support
  340. private bool disposedValue = false; // 要检测冗余调用
  341. /// <summary>
  342. /// 是否正在释放
  343. /// </summary>
  344. private bool m_disposing = false;
  345. protected bool CheckDispose()
  346. {
  347. if (m_disposing)
  348. {
  349. return true;
  350. }
  351. if (disposedValue)
  352. {
  353. throw new ObjectDisposedException(
  354. $"{nameof(Kcp)} [conv:{conv}]");
  355. }
  356. return false;
  357. }
  358. protected virtual void Dispose(bool disposing)
  359. {
  360. try
  361. {
  362. m_disposing = true;
  363. if (!disposedValue)
  364. {
  365. if (disposing)
  366. {
  367. // 释放托管状态(托管对象)。
  368. callbackHandle = null;
  369. acklist = null;
  370. buffer = null;
  371. }
  372. // 释放未托管的资源(未托管的对象)并在以下内容中替代终结器。
  373. // 将大型字段设置为 null。
  374. void FreeCollection(IEnumerable<Segment> collection)
  375. {
  376. if (collection == null)
  377. {
  378. return;
  379. }
  380. foreach (var item in collection)
  381. {
  382. try
  383. {
  384. SegmentManager.Free(item);
  385. }
  386. catch (Exception)
  387. {
  388. //理论上此处不会有任何异常
  389. LogFail($"此处绝不应该出现异常。 Dispose 时出现预计外异常,联系作者");
  390. }
  391. }
  392. }
  393. lock (snd_queueLock)
  394. {
  395. while (snd_queue != null &&
  396. (snd_queue.TryDequeue(out var segment)
  397. || !snd_queue.IsEmpty)
  398. )
  399. {
  400. try
  401. {
  402. SegmentManager.Free(segment);
  403. }
  404. catch (Exception)
  405. {
  406. //理论上这里没有任何异常;
  407. }
  408. }
  409. snd_queue = null;
  410. }
  411. lock (snd_bufLock)
  412. {
  413. FreeCollection(snd_buf);
  414. snd_buf?.Clear();
  415. snd_buf = null;
  416. }
  417. lock (rcv_bufLock)
  418. {
  419. FreeCollection(rcv_buf);
  420. rcv_buf?.Clear();
  421. rcv_buf = null;
  422. }
  423. lock (rcv_queueLock)
  424. {
  425. FreeCollection(rcv_queue);
  426. rcv_queue?.Clear();
  427. rcv_queue = null;
  428. }
  429. disposedValue = true;
  430. }
  431. }
  432. finally
  433. {
  434. m_disposing = false;
  435. }
  436. }
  437. // 仅当以上 Dispose(bool disposing) 拥有用于释放未托管资源的代码时才替代终结器。
  438. ~KcpCore()
  439. {
  440. // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  441. Dispose(false);
  442. }
  443. // 添加此代码以正确实现可处置模式。
  444. /// <summary>
  445. /// 释放不是严格线程安全的,尽量使用和Update相同的线程调用,
  446. /// 或者等待析构时自动释放。
  447. /// </summary>
  448. public void Dispose()
  449. {
  450. // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  451. Dispose(true);
  452. // 如果在以上内容中替代了终结器,则取消注释以下行。
  453. GC.SuppressFinalize(this);
  454. }
  455. #endregion
  456. #region 功能逻辑
  457. //功能函数
  458. /// <summary>
  459. /// Determine when should you invoke ikcp_update:
  460. /// returns when you should invoke ikcp_update in millisec, if there
  461. /// is no ikcp_input/_send calling. you can call ikcp_update in that
  462. /// time, instead of call update repeatly.
  463. /// <para></para>
  464. /// Important to reduce unnacessary ikcp_update invoking. use it to
  465. /// schedule ikcp_update (eg. implementing an epoll-like mechanism,
  466. /// or optimize ikcp_update when handling massive kcp connections)
  467. /// <para></para>
  468. /// </summary>
  469. /// <param name="time"></param>
  470. /// <returns></returns>
  471. public DateTimeOffset Check(in DateTimeOffset time)
  472. {
  473. if (CheckDispose())
  474. {
  475. //检查释放
  476. return default;
  477. }
  478. if (updated == 0)
  479. {
  480. return time;
  481. }
  482. var current_ = time.ConvertTime();
  483. var ts_flush_ = ts_flush;
  484. var tm_flush_ = 0x7fffffff;
  485. var tm_packet = 0x7fffffff;
  486. var minimal = 0;
  487. if (Itimediff(current_, ts_flush_) >= 10000 || Itimediff(current_, ts_flush_) < -10000)
  488. {
  489. ts_flush_ = current_;
  490. }
  491. if (Itimediff(current_, ts_flush_) >= 0)
  492. {
  493. return time;
  494. }
  495. tm_flush_ = Itimediff(ts_flush_, current_);
  496. lock (snd_bufLock)
  497. {
  498. foreach (var seg in snd_buf)
  499. {
  500. var diff = Itimediff(seg.resendts, current_);
  501. if (diff <= 0)
  502. {
  503. return time;
  504. }
  505. if (diff < tm_packet)
  506. {
  507. tm_packet = diff;
  508. }
  509. }
  510. }
  511. minimal = tm_packet < tm_flush_ ? tm_packet : tm_flush_;
  512. if (minimal >= interval) minimal = (int)interval;
  513. return time + TimeSpan.FromMilliseconds(minimal);
  514. }
  515. /// <summary>
  516. /// move available data from rcv_buf -> rcv_queue
  517. /// </summary>
  518. protected void Move_Rcv_buf_2_Rcv_queue()
  519. {
  520. lock (rcv_bufLock)
  521. {
  522. while (rcv_buf.Count > 0)
  523. {
  524. var seg = rcv_buf.First.Value;
  525. if (seg.sn == rcv_nxt && rcv_queue.Count < rcv_wnd)
  526. {
  527. rcv_buf.RemoveFirst();
  528. lock (rcv_queueLock)
  529. {
  530. rcv_queue.Add(seg);
  531. }
  532. rcv_nxt++;
  533. }
  534. else
  535. {
  536. break;
  537. }
  538. }
  539. }
  540. }
  541. /// <summary>
  542. /// update ack.
  543. /// </summary>
  544. /// <param name="rtt"></param>
  545. protected void Update_ack(int rtt)
  546. {
  547. if (rx_srtt == 0)
  548. {
  549. rx_srtt = (uint)rtt;
  550. rx_rttval = (uint)rtt / 2;
  551. }
  552. else
  553. {
  554. int delta = (int)((uint)rtt - rx_srtt);
  555. if (delta < 0)
  556. {
  557. delta = -delta;
  558. }
  559. rx_rttval = (3 * rx_rttval + (uint)delta) / 4;
  560. rx_srtt = (uint)((7 * rx_srtt + rtt) / 8);
  561. if (rx_srtt < 1)
  562. {
  563. rx_srtt = 1;
  564. }
  565. }
  566. var rto = rx_srtt + Max(interval, 4 * rx_rttval);
  567. rx_rto = Ibound(rx_minrto, rto, IKCP_RTO_MAX);
  568. }
  569. protected void Shrink_buf()
  570. {
  571. lock (snd_bufLock)
  572. {
  573. snd_una = snd_buf.Count > 0 ? snd_buf.First.Value.sn : snd_nxt;
  574. }
  575. }
  576. protected void Parse_ack(uint sn)
  577. {
  578. if (Itimediff(sn, snd_una) < 0 || Itimediff(sn, snd_nxt) >= 0)
  579. {
  580. return;
  581. }
  582. lock (snd_bufLock)
  583. {
  584. for (var p = snd_buf.First; p != null; p = p.Next)
  585. {
  586. var seg = p.Value;
  587. if (sn == seg.sn)
  588. {
  589. snd_buf.Remove(p);
  590. SegmentManager.Free(seg);
  591. break;
  592. }
  593. if (Itimediff(sn, seg.sn) < 0)
  594. {
  595. break;
  596. }
  597. }
  598. }
  599. }
  600. protected void Parse_una(uint una)
  601. {
  602. /// 删除给定时间之前的片段。保留之后的片段
  603. lock (snd_bufLock)
  604. {
  605. while (snd_buf.First != null)
  606. {
  607. var seg = snd_buf.First.Value;
  608. if (Itimediff(una, seg.sn) > 0)
  609. {
  610. snd_buf.RemoveFirst();
  611. SegmentManager.Free(seg);
  612. }
  613. else
  614. {
  615. break;
  616. }
  617. }
  618. }
  619. }
  620. protected void Parse_fastack(uint sn, uint ts)
  621. {
  622. if (Itimediff(sn, snd_una) < 0 || Itimediff(sn, snd_nxt) >= 0)
  623. {
  624. return;
  625. }
  626. lock (snd_bufLock)
  627. {
  628. foreach (var item in snd_buf)
  629. {
  630. var seg = item;
  631. if (Itimediff(sn, seg.sn) < 0)
  632. {
  633. break;
  634. }
  635. else if (sn != seg.sn)
  636. {
  637. #if !IKCP_FASTACK_CONSERVE
  638. seg.fastack++;
  639. #else
  640. if (Itimediff(ts, seg.ts) >= 0)
  641. {
  642. seg.fastack++;
  643. }
  644. #endif
  645. }
  646. }
  647. }
  648. }
  649. /// <summary>
  650. /// 处理下层网络收到的数据包
  651. /// </summary>
  652. /// <param name="newseg"></param>
  653. internal virtual void Parse_data(Segment newseg)
  654. {
  655. var sn = newseg.sn;
  656. lock (rcv_bufLock)
  657. {
  658. if (Itimediff(sn, rcv_nxt + rcv_wnd) >= 0 || Itimediff(sn, rcv_nxt) < 0)
  659. {
  660. // 如果接收到数据报文的编号大于 rcv_nxt + rcv_wnd 或小于 rcv_nxt, 这个报文就会被丢弃.
  661. SegmentManager.Free(newseg);
  662. return;
  663. }
  664. var repeat = false;
  665. ///检查是否重复消息和插入位置
  666. LinkedListNode<Segment> p;
  667. for (p = rcv_buf.Last; p != null; p = p.Previous)
  668. {
  669. var seg = p.Value;
  670. if (seg.sn == sn)
  671. {
  672. repeat = true;
  673. break;
  674. }
  675. if (Itimediff(sn, seg.sn) > 0)
  676. {
  677. break;
  678. }
  679. }
  680. if (!repeat)
  681. {
  682. if (CanLog(KcpLogMask.IKCP_LOG_PARSE_DATA))
  683. {
  684. LogWriteLine($"{newseg.ToLogString()}", KcpLogMask.IKCP_LOG_PARSE_DATA.ToString());
  685. }
  686. if (p == null)
  687. {
  688. rcv_buf.AddFirst(newseg);
  689. if (newseg.frg + 1 > rcv_wnd)
  690. {
  691. //分片数大于接收窗口,造成kcp阻塞冻结。
  692. //Console.WriteLine($"分片数大于接收窗口,造成kcp阻塞冻结。frgCount:{newseg.frg + 1} rcv_wnd:{rcv_wnd}");
  693. //百分之百阻塞冻结,打印日志没有必要。直接抛出异常。
  694. throw new NotSupportedException(
  695. $"分片数大于接收窗口,造成kcp阻塞冻结。frgCount:{newseg.frg + 1} rcv_wnd:{rcv_wnd}");
  696. }
  697. }
  698. else
  699. {
  700. rcv_buf.AddAfter(p, newseg);
  701. }
  702. }
  703. else
  704. {
  705. SegmentManager.Free(newseg);
  706. }
  707. }
  708. Move_Rcv_buf_2_Rcv_queue();
  709. }
  710. protected ushort Wnd_unused()
  711. {
  712. ///此处没有加锁,所以不要内联变量,否则可能导致 判断变量和赋值变量不一致
  713. int waitCount = rcv_queue.Count;
  714. if (waitCount < rcv_wnd)
  715. {
  716. //Q:为什么要减去nrcv_que,rcv_queue中已经排好序了,还要算在接收窗口内,感觉不能理解?
  717. //现在问题是如果一个超大数据包,分片数大于rcv_wnd接收窗口,会导致rcv_wnd持续为0,阻塞整个流程。
  718. //个人理解,rcv_queue中的数据是已经确认的数据,无论用户是否recv,都不应该影响收发。
  719. //A:现在在发送出加一个分片数检测,过大直接抛出异常。防止阻塞发送。
  720. //在接收端也加一个检测,如果(frg+1)分片数 > rcv_wnd,也要抛出一个异常或者警告。至少有个提示。
  721. /// fix https://github.com/skywind3000/kcp/issues/126
  722. /// 实际上 rcv_wnd 不应该大于65535
  723. var count = rcv_wnd - waitCount;
  724. return (ushort)Min(count, ushort.MaxValue);
  725. }
  726. return 0;
  727. }
  728. /// <summary>
  729. /// flush pending data
  730. /// </summary>
  731. protected void Flush()
  732. {
  733. var current_ = current;
  734. var buffer_ = buffer;
  735. var change = 0;
  736. var lost = 0;
  737. var offset = 0;
  738. if (updated == 0)
  739. {
  740. return;
  741. }
  742. ushort wnd_ = Wnd_unused();
  743. unsafe
  744. {
  745. ///在栈上分配这个segment,这个segment随用随销毁,不会被保存
  746. const int len = KcpSegment.LocalOffset + KcpSegment.HeadOffset;
  747. var ptr = stackalloc byte[len];
  748. KcpSegment seg = new KcpSegment(ptr, 0);
  749. //seg = KcpSegment.AllocHGlobal(0);
  750. seg.conv = conv;
  751. seg.cmd = IKCP_CMD_ACK;
  752. //seg.frg = 0;
  753. seg.wnd = wnd_;
  754. seg.una = rcv_nxt;
  755. //seg.len = 0;
  756. //seg.sn = 0;
  757. //seg.ts = 0;
  758. #region flush acknowledges
  759. if (CheckDispose())
  760. {
  761. //检查释放
  762. return;
  763. }
  764. while (acklist.TryDequeue(out var temp))
  765. {
  766. if (offset + IKCP_OVERHEAD > mtu)
  767. {
  768. callbackHandle.Output(buffer, offset);
  769. offset = 0;
  770. buffer = CreateBuffer(BufferNeedSize);
  771. //IKcpOutputer outputer = null;
  772. //var span = outputer.GetSpan(offset);
  773. //buffer.Memory.Span.Slice(0, offset).CopyTo(span);
  774. //outputer.Advance(offset);
  775. //outputer.Flush();
  776. }
  777. seg.sn = temp.sn;
  778. seg.ts = temp.ts;
  779. offset += seg.Encode(buffer.Memory.Span.Slice(offset));
  780. }
  781. #endregion
  782. #region probe window size (if remote window size equals zero)
  783. // probe window size (if remote window size equals zero)
  784. if (rmt_wnd == 0)
  785. {
  786. if (probe_wait == 0)
  787. {
  788. probe_wait = IKCP_PROBE_INIT;
  789. ts_probe = current + probe_wait;
  790. }
  791. else
  792. {
  793. if (Itimediff(current, ts_probe) >= 0)
  794. {
  795. if (probe_wait < IKCP_PROBE_INIT)
  796. {
  797. probe_wait = IKCP_PROBE_INIT;
  798. }
  799. probe_wait += probe_wait / 2;
  800. if (probe_wait > IKCP_PROBE_LIMIT)
  801. {
  802. probe_wait = IKCP_PROBE_LIMIT;
  803. }
  804. ts_probe = current + probe_wait;
  805. probe |= IKCP_ASK_SEND;
  806. }
  807. }
  808. }
  809. else
  810. {
  811. ts_probe = 0;
  812. probe_wait = 0;
  813. }
  814. #endregion
  815. #region flush window probing commands
  816. // flush window probing commands
  817. if ((probe & IKCP_ASK_SEND) != 0)
  818. {
  819. seg.cmd = IKCP_CMD_WASK;
  820. if (offset + IKCP_OVERHEAD > (int)mtu)
  821. {
  822. callbackHandle.Output(buffer, offset);
  823. offset = 0;
  824. buffer = CreateBuffer(BufferNeedSize);
  825. }
  826. offset += seg.Encode(buffer.Memory.Span.Slice(offset));
  827. }
  828. if ((probe & IKCP_ASK_TELL) != 0)
  829. {
  830. seg.cmd = IKCP_CMD_WINS;
  831. if (offset + IKCP_OVERHEAD > (int)mtu)
  832. {
  833. callbackHandle.Output(buffer, offset);
  834. offset = 0;
  835. buffer = CreateBuffer(BufferNeedSize);
  836. }
  837. offset += seg.Encode(buffer.Memory.Span.Slice(offset));
  838. }
  839. probe = 0;
  840. #endregion
  841. }
  842. #region 刷新,将发送等待列表移动到发送列表
  843. // calculate window size
  844. var cwnd_ = Min(snd_wnd, rmt_wnd);
  845. if (nocwnd == 0)
  846. {
  847. cwnd_ = Min(cwnd, cwnd_);
  848. }
  849. while (Itimediff(snd_nxt, snd_una + cwnd_) < 0)
  850. {
  851. if (snd_queue.TryDequeue(out var newseg))
  852. {
  853. newseg.conv = conv;
  854. newseg.cmd = IKCP_CMD_PUSH;
  855. newseg.wnd = wnd_;
  856. newseg.ts = current_;
  857. newseg.sn = snd_nxt;
  858. snd_nxt++;
  859. newseg.una = rcv_nxt;
  860. newseg.resendts = current_;
  861. newseg.rto = rx_rto;
  862. newseg.fastack = 0;
  863. newseg.xmit = 0;
  864. lock (snd_bufLock)
  865. {
  866. snd_buf.AddLast(newseg);
  867. }
  868. }
  869. else
  870. {
  871. break;
  872. }
  873. }
  874. #endregion
  875. #region 刷新 发送列表,调用Output
  876. // calculate resent
  877. var resent = fastresend > 0 ? (uint)fastresend : 0xffffffff;
  878. var rtomin = nodelay == 0 ? (rx_rto >> 3) : 0;
  879. lock (snd_bufLock)
  880. {
  881. // flush data segments
  882. // if (snd_buf.Count > 0)
  883. // {
  884. // LogTool.Log($"准备发送包数量{cwnd_} {snd_buf.Count} {snd_queue.Count}");
  885. // }
  886. foreach (var item in snd_buf)
  887. {
  888. var segment = item;
  889. var needsend = false;
  890. var debug = Itimediff(current_, segment.resendts);
  891. if (segment.xmit == 0)
  892. {
  893. //新加入 snd_buf 中, 从未发送过的报文直接发送出去;
  894. needsend = true;
  895. segment.xmit++;
  896. segment.rto = rx_rto;
  897. // LogTool.Log($"新发送包{segment.sn} 大小{segment.len}" );
  898. segment.resendts = current_ + rx_rto + rtomin;
  899. }
  900. else if (Itimediff(current_, segment.resendts) >= 0)
  901. {
  902. //发送过的, 但是在 RTO 内未收到 ACK 的报文, 需要重传;
  903. needsend = true;
  904. segment.xmit++;
  905. this.xmit++;
  906. if (nodelay == 0)
  907. {
  908. segment.rto += Math.Max(segment.rto, rx_rto);
  909. }
  910. else
  911. {
  912. var step = nodelay < 2 ? segment.rto : rx_rto;
  913. segment.rto += step / 2;
  914. }
  915. // LogTool.Log($"重送包{segment.sn} 大小{segment.len} 次数{segment.xmit}");
  916. segment.resendts = current_ + segment.rto;
  917. lost = 1;
  918. }
  919. else if (segment.fastack >= resent)
  920. {
  921. //发送过的, 但是 ACK 失序若干次的报文, 需要执行快速重传.
  922. // LogTool.Log($"快速重传{segment.sn} 大小{segment.len} 次数{segment.xmit}");
  923. if (segment.xmit <= fastlimit
  924. || fastlimit <= 0)
  925. {
  926. needsend = true;
  927. segment.xmit++;
  928. segment.fastack = 0;
  929. segment.resendts = current_ + segment.rto;
  930. change++;
  931. }
  932. }
  933. if (needsend)
  934. {
  935. segment.ts = current_;
  936. segment.wnd = wnd_;
  937. segment.una = rcv_nxt;
  938. var need = IKCP_OVERHEAD + segment.len;
  939. if (offset + need > mtu)
  940. {
  941. callbackHandle.Output(buffer, offset);
  942. offset = 0;
  943. buffer = CreateBuffer(BufferNeedSize);
  944. }
  945. offset += segment.Encode(buffer.Memory.Span.Slice(offset));
  946. if (CanLog(KcpLogMask.IKCP_LOG_NEED_SEND))
  947. {
  948. LogWriteLine($"{segment.ToLogString(true)}", KcpLogMask.IKCP_LOG_NEED_SEND.ToString());
  949. }
  950. if (segment.xmit >= dead_link)
  951. {
  952. state = -1;
  953. if (CanLog(KcpLogMask.IKCP_LOG_DEAD_LINK))
  954. {
  955. LogWriteLine($"state = -1; xmit:{segment.xmit} >= dead_link:{dead_link}",
  956. KcpLogMask.IKCP_LOG_DEAD_LINK.ToString());
  957. }
  958. }
  959. }
  960. }
  961. }
  962. // flash remain segments
  963. if (offset > 0)
  964. {
  965. callbackHandle.Output(buffer, offset);
  966. offset = 0;
  967. buffer = CreateBuffer(BufferNeedSize);
  968. }
  969. #endregion
  970. #region update ssthresh
  971. // update ssthresh 根据丢包情况计算 ssthresh 和 cwnd.
  972. if (change != 0)
  973. {
  974. var inflight = snd_nxt - snd_una;
  975. ssthresh = inflight / 2;
  976. if (ssthresh < IKCP_THRESH_MIN)
  977. {
  978. ssthresh = IKCP_THRESH_MIN;
  979. }
  980. cwnd = ssthresh + resent;
  981. incr = cwnd * mss;
  982. }
  983. if (lost != 0)
  984. {
  985. ssthresh = cwnd / 2;
  986. if (ssthresh < IKCP_THRESH_MIN)
  987. {
  988. ssthresh = IKCP_THRESH_MIN;
  989. }
  990. cwnd = 1;
  991. incr = mss;
  992. }
  993. if (cwnd < 1)
  994. {
  995. cwnd = 1;
  996. incr = mss;
  997. }
  998. #endregion
  999. if (state == -1)
  1000. {
  1001. OnDeadlink();
  1002. }
  1003. }
  1004. protected virtual void OnDeadlink()
  1005. {
  1006. }
  1007. /// <summary>
  1008. /// Test OutputWriter
  1009. /// </summary>
  1010. protected void Flush2()
  1011. {
  1012. var current_ = current;
  1013. var change = 0;
  1014. var lost = 0;
  1015. if (updated == 0)
  1016. {
  1017. return;
  1018. }
  1019. ushort wnd_ = Wnd_unused();
  1020. unsafe
  1021. {
  1022. ///在栈上分配这个segment,这个segment随用随销毁,不会被保存
  1023. const int len = KcpSegment.LocalOffset + KcpSegment.HeadOffset;
  1024. var ptr = stackalloc byte[len];
  1025. KcpSegment seg = new KcpSegment(ptr, 0);
  1026. //seg = KcpSegment.AllocHGlobal(0);
  1027. seg.conv = conv;
  1028. seg.cmd = IKCP_CMD_ACK;
  1029. //seg.frg = 0;
  1030. seg.wnd = wnd_;
  1031. seg.una = rcv_nxt;
  1032. //seg.len = 0;
  1033. //seg.sn = 0;
  1034. //seg.ts = 0;
  1035. #region flush acknowledges
  1036. if (CheckDispose())
  1037. {
  1038. //检查释放
  1039. return;
  1040. }
  1041. while (acklist.TryDequeue(out var temp))
  1042. {
  1043. if (OutputWriter.UnflushedBytes + IKCP_OVERHEAD > mtu)
  1044. {
  1045. OutputWriter.Flush();
  1046. }
  1047. seg.sn = temp.sn;
  1048. seg.ts = temp.ts;
  1049. seg.Encode(OutputWriter);
  1050. }
  1051. #endregion
  1052. #region probe window size (if remote window size equals zero)
  1053. // probe window size (if remote window size equals zero)
  1054. if (rmt_wnd == 0)
  1055. {
  1056. if (probe_wait == 0)
  1057. {
  1058. probe_wait = IKCP_PROBE_INIT;
  1059. ts_probe = current + probe_wait;
  1060. }
  1061. else
  1062. {
  1063. if (Itimediff(current, ts_probe) >= 0)
  1064. {
  1065. if (probe_wait < IKCP_PROBE_INIT)
  1066. {
  1067. probe_wait = IKCP_PROBE_INIT;
  1068. }
  1069. probe_wait += probe_wait / 2;
  1070. if (probe_wait > IKCP_PROBE_LIMIT)
  1071. {
  1072. probe_wait = IKCP_PROBE_LIMIT;
  1073. }
  1074. ts_probe = current + probe_wait;
  1075. probe |= IKCP_ASK_SEND;
  1076. }
  1077. }
  1078. }
  1079. else
  1080. {
  1081. ts_probe = 0;
  1082. probe_wait = 0;
  1083. }
  1084. #endregion
  1085. #region flush window probing commands
  1086. // flush window probing commands
  1087. if ((probe & IKCP_ASK_SEND) != 0)
  1088. {
  1089. seg.cmd = IKCP_CMD_WASK;
  1090. if (OutputWriter.UnflushedBytes + IKCP_OVERHEAD > (int)mtu)
  1091. {
  1092. OutputWriter.Flush();
  1093. }
  1094. seg.Encode(OutputWriter);
  1095. }
  1096. if ((probe & IKCP_ASK_TELL) != 0)
  1097. {
  1098. seg.cmd = IKCP_CMD_WINS;
  1099. if (OutputWriter.UnflushedBytes + IKCP_OVERHEAD > (int)mtu)
  1100. {
  1101. OutputWriter.Flush();
  1102. }
  1103. seg.Encode(OutputWriter);
  1104. }
  1105. probe = 0;
  1106. #endregion
  1107. }
  1108. #region 刷新,将发送等待列表移动到发送列表
  1109. // calculate window size
  1110. var cwnd_ = Min(snd_wnd, rmt_wnd);
  1111. if (nocwnd == 0)
  1112. {
  1113. cwnd_ = Min(cwnd, cwnd_);
  1114. }
  1115. while (Itimediff(snd_nxt, snd_una + cwnd_) < 0)
  1116. {
  1117. if (snd_queue.TryDequeue(out var newseg))
  1118. {
  1119. newseg.conv = conv;
  1120. newseg.cmd = IKCP_CMD_PUSH;
  1121. newseg.wnd = wnd_;
  1122. newseg.ts = current_;
  1123. newseg.sn = snd_nxt;
  1124. snd_nxt++;
  1125. newseg.una = rcv_nxt;
  1126. newseg.resendts = current_;
  1127. newseg.rto = rx_rto;
  1128. newseg.fastack = 0;
  1129. newseg.xmit = 0;
  1130. lock (snd_bufLock)
  1131. {
  1132. snd_buf.AddLast(newseg);
  1133. }
  1134. }
  1135. else
  1136. {
  1137. break;
  1138. }
  1139. }
  1140. #endregion
  1141. #region 刷新 发送列表,调用Output
  1142. // calculate resent
  1143. var resent = fastresend > 0 ? (uint)fastresend : 0xffffffff;
  1144. var rtomin = nodelay == 0 ? (rx_rto >> 3) : 0;
  1145. lock (snd_bufLock)
  1146. {
  1147. // flush data segments
  1148. foreach (var item in snd_buf)
  1149. {
  1150. var segment = item;
  1151. var needsend = false;
  1152. var debug = Itimediff(current_, segment.resendts);
  1153. if (segment.xmit == 0)
  1154. {
  1155. //新加入 snd_buf 中, 从未发送过的报文直接发送出去;
  1156. needsend = true;
  1157. segment.xmit++;
  1158. segment.rto = rx_rto;
  1159. segment.resendts = current_ + rx_rto + rtomin;
  1160. }
  1161. else if (Itimediff(current_, segment.resendts) >= 0)
  1162. {
  1163. //发送过的, 但是在 RTO 内未收到 ACK 的报文, 需要重传;
  1164. needsend = true;
  1165. segment.xmit++;
  1166. this.xmit++;
  1167. if (nodelay == 0)
  1168. {
  1169. segment.rto += Math.Max(segment.rto, rx_rto);
  1170. }
  1171. else
  1172. {
  1173. var step = nodelay < 2 ? segment.rto : rx_rto;
  1174. segment.rto += step / 2;
  1175. }
  1176. segment.resendts = current_ + segment.rto;
  1177. lost = 1;
  1178. }
  1179. else if (segment.fastack >= resent)
  1180. {
  1181. //发送过的, 但是 ACK 失序若干次的报文, 需要执行快速重传.
  1182. if (segment.xmit <= fastlimit
  1183. || fastlimit <= 0)
  1184. {
  1185. needsend = true;
  1186. segment.xmit++;
  1187. segment.fastack = 0;
  1188. segment.resendts = current_ + segment.rto;
  1189. change++;
  1190. }
  1191. }
  1192. if (needsend)
  1193. {
  1194. segment.ts = current_;
  1195. segment.wnd = wnd_;
  1196. segment.una = rcv_nxt;
  1197. var need = IKCP_OVERHEAD + segment.len;
  1198. if (OutputWriter.UnflushedBytes + need > mtu)
  1199. {
  1200. OutputWriter.Flush();
  1201. }
  1202. segment.Encode(OutputWriter);
  1203. if (CanLog(KcpLogMask.IKCP_LOG_NEED_SEND))
  1204. {
  1205. LogWriteLine($"{segment.ToLogString(true)}", KcpLogMask.IKCP_LOG_NEED_SEND.ToString());
  1206. }
  1207. if (segment.xmit >= dead_link)
  1208. {
  1209. state = -1;
  1210. if (CanLog(KcpLogMask.IKCP_LOG_DEAD_LINK))
  1211. {
  1212. LogWriteLine($"state = -1; xmit:{segment.xmit} >= dead_link:{dead_link}",
  1213. KcpLogMask.IKCP_LOG_DEAD_LINK.ToString());
  1214. }
  1215. }
  1216. }
  1217. }
  1218. }
  1219. // flash remain segments
  1220. if (OutputWriter.UnflushedBytes > 0)
  1221. {
  1222. OutputWriter.Flush();
  1223. }
  1224. #endregion
  1225. #region update ssthresh
  1226. // update ssthresh 根据丢包情况计算 ssthresh 和 cwnd.
  1227. if (change != 0)
  1228. {
  1229. var inflight = snd_nxt - snd_una;
  1230. ssthresh = inflight / 2;
  1231. if (ssthresh < IKCP_THRESH_MIN)
  1232. {
  1233. ssthresh = IKCP_THRESH_MIN;
  1234. }
  1235. cwnd = ssthresh + resent;
  1236. incr = cwnd * mss;
  1237. }
  1238. if (lost != 0)
  1239. {
  1240. ssthresh = cwnd / 2;
  1241. if (ssthresh < IKCP_THRESH_MIN)
  1242. {
  1243. ssthresh = IKCP_THRESH_MIN;
  1244. }
  1245. cwnd = 1;
  1246. incr = mss;
  1247. }
  1248. if (cwnd < 1)
  1249. {
  1250. cwnd = 1;
  1251. incr = mss;
  1252. }
  1253. #endregion
  1254. if (state == -1)
  1255. {
  1256. OnDeadlink();
  1257. }
  1258. }
  1259. /// <summary>
  1260. /// update state (call it repeatedly, every 10ms-100ms), or you can ask
  1261. /// ikcp_check when to call it again (without ikcp_input/_send calling).
  1262. /// </summary>
  1263. /// <param name="time">DateTime.UtcNow</param>
  1264. public void Update(in DateTimeOffset time)
  1265. {
  1266. if (CheckDispose())
  1267. {
  1268. //检查释放
  1269. return;
  1270. }
  1271. current = time.ConvertTime();
  1272. if (updated == 0)
  1273. {
  1274. updated = 1;
  1275. ts_flush = current;
  1276. }
  1277. var slap = Itimediff(current, ts_flush);
  1278. if (slap >= 10000 || slap < -10000)
  1279. {
  1280. ts_flush = current;
  1281. slap = 0;
  1282. }
  1283. if (slap >= 0)
  1284. {
  1285. ts_flush += interval;
  1286. if (Itimediff(current, ts_flush) >= 0)
  1287. {
  1288. ts_flush = current + interval;
  1289. }
  1290. Flush();
  1291. }
  1292. }
  1293. #endregion
  1294. #region 设置控制
  1295. public int SetMtu(int mtu = IKCP_MTU_DEF)
  1296. {
  1297. if (mtu < 50 || mtu < IKCP_OVERHEAD)
  1298. {
  1299. return -1;
  1300. }
  1301. var buffer_ = CreateBuffer(BufferNeedSize);
  1302. if (null == buffer_)
  1303. {
  1304. return -2;
  1305. }
  1306. this.mtu = (uint)mtu;
  1307. mss = this.mtu - IKCP_OVERHEAD;
  1308. buffer.Dispose();
  1309. buffer = buffer_;
  1310. return 0;
  1311. }
  1312. /// <summary>
  1313. ///
  1314. /// </summary>
  1315. /// <param name="interval_"></param>
  1316. /// <returns></returns>
  1317. public int Interval(int interval_)
  1318. {
  1319. if (interval_ > 5000)
  1320. {
  1321. interval_ = 5000;
  1322. }
  1323. else if (interval_ < 0)
  1324. {
  1325. /// 将最小值 10 改为 0;
  1326. ///在特殊形况下允许CPU满负荷运转;
  1327. interval_ = 0;
  1328. }
  1329. interval = (uint)interval_;
  1330. return 0;
  1331. }
  1332. public int NoDelay(int nodelay_, int interval_, int resend_, int nc_)
  1333. {
  1334. if (nodelay_ > 0)
  1335. {
  1336. nodelay = (uint)nodelay_;
  1337. if (nodelay_ != 0)
  1338. {
  1339. rx_minrto = IKCP_RTO_NDL;
  1340. }
  1341. else
  1342. {
  1343. rx_minrto = IKCP_RTO_MIN;
  1344. }
  1345. }
  1346. if (resend_ >= 0)
  1347. {
  1348. fastresend = resend_;
  1349. }
  1350. if (nc_ >= 0)
  1351. {
  1352. nocwnd = nc_;
  1353. }
  1354. return Interval(interval_);
  1355. }
  1356. public int WndSize(int sndwnd = IKCP_WND_SND, int rcvwnd = IKCP_WND_RCV)
  1357. {
  1358. if (sndwnd > 0)
  1359. {
  1360. snd_wnd = (uint)sndwnd;
  1361. }
  1362. if (rcvwnd > 0)
  1363. {
  1364. rcv_wnd = (uint)rcvwnd;
  1365. }
  1366. return 0;
  1367. }
  1368. #endregion
  1369. }
  1370. public partial class KcpCore<Segment> : IKcpSendable
  1371. {
  1372. /// <summary>
  1373. /// user/upper level send, returns below zero for error
  1374. /// </summary>
  1375. /// <param name="span"></param>
  1376. /// <param name="options"></param>
  1377. /// <returns></returns>
  1378. public int Send(ReadOnlySpan<byte> span, object options = null)
  1379. {
  1380. if (CheckDispose())
  1381. {
  1382. //检查释放
  1383. return -4;
  1384. }
  1385. if (mss <= 0)
  1386. {
  1387. throw new InvalidOperationException($" mss <= 0 ");
  1388. }
  1389. if (span.Length == 0)
  1390. {
  1391. return -1;
  1392. }
  1393. var offset = 0;
  1394. int count;
  1395. #region append to previous segment in streaming mode (if possible)
  1396. /// 基于线程安全和数据结构的等原因,移除了追加数据到最后一个包行为。
  1397. #endregion
  1398. #region fragment
  1399. if (span.Length <= mss)
  1400. {
  1401. count = 1;
  1402. }
  1403. else
  1404. {
  1405. count = (int)(span.Length + mss - 1) / (int)mss;
  1406. }
  1407. if (count > IKCP_WND_RCV)
  1408. {
  1409. return -2;
  1410. }
  1411. if (count == 0)
  1412. {
  1413. count = 1;
  1414. }
  1415. lock (snd_queueLock)
  1416. {
  1417. for (var i = 0; i < count; i++)
  1418. {
  1419. int size;
  1420. if (span.Length - offset > mss)
  1421. {
  1422. size = (int)mss;
  1423. }
  1424. else
  1425. {
  1426. size = (int)span.Length - offset;
  1427. }
  1428. var seg = SegmentManager.Alloc(size);
  1429. span.Slice(offset, size).CopyTo(seg.data);
  1430. offset += size;
  1431. seg.frg = (byte)(count - i - 1);
  1432. snd_queue.Enqueue(seg);
  1433. }
  1434. }
  1435. #endregion
  1436. return 0;
  1437. }
  1438. //public int Send(Span<byte> span)
  1439. //{
  1440. // return Send((ReadOnlySpan<byte>)span);
  1441. //}
  1442. public int Send(ReadOnlySequence<byte> span, object options = null)
  1443. {
  1444. if (CheckDispose())
  1445. {
  1446. //检查释放
  1447. return -4;
  1448. }
  1449. if (mss <= 0)
  1450. {
  1451. throw new InvalidOperationException($" mss <= 0 ");
  1452. }
  1453. if (span.Length == 0)
  1454. {
  1455. return -1;
  1456. }
  1457. var offset = 0;
  1458. int count;
  1459. #region append to previous segment in streaming mode (if possible)
  1460. /// 基于线程安全和数据结构的等原因,移除了追加数据到最后一个包行为。
  1461. #endregion
  1462. #region fragment
  1463. if (span.Length <= mss)
  1464. {
  1465. count = 1;
  1466. }
  1467. else
  1468. {
  1469. count = (int)(span.Length + mss - 1) / (int)mss;
  1470. }
  1471. if (count > IKCP_WND_RCV)
  1472. {
  1473. return -2;
  1474. }
  1475. if (count == 0)
  1476. {
  1477. count = 1;
  1478. }
  1479. lock (snd_queueLock)
  1480. {
  1481. for (var i = 0; i < count; i++)
  1482. {
  1483. int size;
  1484. if (span.Length - offset > mss)
  1485. {
  1486. size = (int)mss;
  1487. }
  1488. else
  1489. {
  1490. size = (int)span.Length - offset;
  1491. }
  1492. var seg = SegmentManager.Alloc(size);
  1493. span.Slice(offset, size).CopyTo(seg.data);
  1494. offset += size;
  1495. seg.frg = (byte)(count - i - 1);
  1496. snd_queue.Enqueue(seg);
  1497. }
  1498. }
  1499. #endregion
  1500. return 0;
  1501. }
  1502. }
  1503. public partial class KcpCore<Segment> : IKcpInputable
  1504. {
  1505. /// <summary>
  1506. /// when you received a low level packet (eg. UDP packet), call it
  1507. /// </summary>
  1508. /// <param name="span"></param>
  1509. /// <returns></returns>
  1510. public int Input(ReadOnlySpan<byte> span)
  1511. {
  1512. if (CheckDispose())
  1513. {
  1514. //检查释放
  1515. return -4;
  1516. }
  1517. if (CanLog(KcpLogMask.IKCP_LOG_INPUT))
  1518. {
  1519. LogWriteLine($"[RI] {span.Length} bytes", KcpLogMask.IKCP_LOG_INPUT.ToString());
  1520. }
  1521. if (span.Length < IKCP_OVERHEAD)
  1522. {
  1523. return -1;
  1524. }
  1525. uint prev_una = snd_una;
  1526. var offset = 0;
  1527. int flag = 0;
  1528. uint maxack = 0;
  1529. uint latest_ts = 0;
  1530. while (true)
  1531. {
  1532. uint ts = 0;
  1533. uint sn = 0;
  1534. uint length = 0;
  1535. uint una = 0;
  1536. uint conv_ = 0;
  1537. ushort wnd = 0;
  1538. byte cmd = 0;
  1539. byte frg = 0;
  1540. if (span.Length - offset < IKCP_OVERHEAD)
  1541. {
  1542. break;
  1543. }
  1544. Span<byte> header = stackalloc byte[24];
  1545. span.Slice(offset, 24).CopyTo(header);
  1546. offset += ReadHeader(header,
  1547. ref conv_,
  1548. ref cmd,
  1549. ref frg,
  1550. ref wnd,
  1551. ref ts,
  1552. ref sn,
  1553. ref una,
  1554. ref length);
  1555. if (conv != conv_)
  1556. {
  1557. return -1;
  1558. }
  1559. if (span.Length - offset < length || (int)length < 0)
  1560. {
  1561. return -2;
  1562. }
  1563. switch (cmd)
  1564. {
  1565. case IKCP_CMD_PUSH:
  1566. case IKCP_CMD_ACK:
  1567. case IKCP_CMD_WASK:
  1568. case IKCP_CMD_WINS:
  1569. break;
  1570. default:
  1571. return -3;
  1572. }
  1573. rmt_wnd = wnd;
  1574. Parse_una(una);
  1575. Shrink_buf();
  1576. if (IKCP_CMD_ACK == cmd)
  1577. {
  1578. if (Itimediff(current, ts) >= 0)
  1579. {
  1580. Update_ack(Itimediff(current, ts));
  1581. }
  1582. Parse_ack(sn);
  1583. Shrink_buf();
  1584. if (flag == 0)
  1585. {
  1586. flag = 1;
  1587. maxack = sn;
  1588. latest_ts = ts;
  1589. }
  1590. else if (Itimediff(sn, maxack) > 0)
  1591. {
  1592. #if !IKCP_FASTACK_CONSERVE
  1593. maxack = sn;
  1594. latest_ts = ts;
  1595. #else
  1596. if (Itimediff(ts, latest_ts) > 0)
  1597. {
  1598. maxack = sn;
  1599. latest_ts = ts;
  1600. }
  1601. #endif
  1602. }
  1603. if (CanLog(KcpLogMask.IKCP_LOG_IN_ACK))
  1604. {
  1605. LogWriteLine($"input ack: sn={sn} rtt={Itimediff(current, ts)} rto={rx_rto}",
  1606. KcpLogMask.IKCP_LOG_IN_ACK.ToString());
  1607. }
  1608. }
  1609. else if (IKCP_CMD_PUSH == cmd)
  1610. {
  1611. if (CanLog(KcpLogMask.IKCP_LOG_IN_DATA))
  1612. {
  1613. LogWriteLine($"input psh: sn={sn} ts={ts}", KcpLogMask.IKCP_LOG_IN_DATA.ToString());
  1614. }
  1615. if (Itimediff(sn, rcv_nxt + rcv_wnd) < 0)
  1616. {
  1617. ///instead of ikcp_ack_push
  1618. acklist.Enqueue((sn, ts));
  1619. if (Itimediff(sn, rcv_nxt) >= 0)
  1620. {
  1621. var seg = SegmentManager.Alloc((int)length);
  1622. seg.conv = conv_;
  1623. seg.cmd = cmd;
  1624. seg.frg = frg;
  1625. seg.wnd = wnd;
  1626. seg.ts = ts;
  1627. seg.sn = sn;
  1628. seg.una = una;
  1629. //seg.len = length; 长度在分配时确定,不能改变
  1630. if (length > 0)
  1631. {
  1632. span.Slice(offset, (int)length).CopyTo(seg.data);
  1633. }
  1634. // LogTool.Log($"收到包{sn} {rcv_nxt}");
  1635. Parse_data(seg);
  1636. }
  1637. }
  1638. }
  1639. else if (IKCP_CMD_WASK == cmd)
  1640. {
  1641. // ready to send back IKCP_CMD_WINS in Ikcp_flush
  1642. // tell remote my window size
  1643. probe |= IKCP_ASK_TELL;
  1644. if (CanLog(KcpLogMask.IKCP_LOG_IN_PROBE))
  1645. {
  1646. LogWriteLine($"input probe", KcpLogMask.IKCP_LOG_IN_PROBE.ToString());
  1647. }
  1648. }
  1649. else if (IKCP_CMD_WINS == cmd)
  1650. {
  1651. // do nothing
  1652. if (CanLog(KcpLogMask.IKCP_LOG_IN_WINS))
  1653. {
  1654. LogWriteLine($"input wins: {wnd}", KcpLogMask.IKCP_LOG_IN_WINS.ToString());
  1655. }
  1656. }
  1657. else
  1658. {
  1659. return -3;
  1660. }
  1661. offset += (int)length;
  1662. }
  1663. if (flag != 0)
  1664. {
  1665. Parse_fastack(maxack, latest_ts);
  1666. }
  1667. if (Itimediff(this.snd_una, prev_una) > 0)
  1668. {
  1669. if (cwnd < rmt_wnd)
  1670. {
  1671. if (cwnd < ssthresh)
  1672. {
  1673. cwnd++;
  1674. incr += mss;
  1675. }
  1676. else
  1677. {
  1678. if (incr < mss)
  1679. {
  1680. incr = mss;
  1681. }
  1682. incr += (mss * mss) / incr + (mss / 16);
  1683. if ((cwnd + 1) * mss <= incr)
  1684. {
  1685. #if true
  1686. cwnd = (incr + mss - 1) / ((mss > 0) ? mss : 1);
  1687. #else
  1688. cwnd++;
  1689. #endif
  1690. }
  1691. }
  1692. if (cwnd > rmt_wnd)
  1693. {
  1694. cwnd = rmt_wnd;
  1695. incr = rmt_wnd * mss;
  1696. }
  1697. }
  1698. }
  1699. return 0;
  1700. }
  1701. /// <summary>
  1702. /// <inheritdoc cref="Input(ReadOnlySpan{byte})"/>
  1703. /// </summary>
  1704. /// <param name="span"></param>
  1705. /// <returns></returns>
  1706. public int Input(ReadOnlySequence<byte> span)
  1707. {
  1708. if (CheckDispose())
  1709. {
  1710. //检查释放
  1711. return -4;
  1712. }
  1713. if (CanLog(KcpLogMask.IKCP_LOG_INPUT))
  1714. {
  1715. LogWriteLine($"[RI] {span.Length} bytes", KcpLogMask.IKCP_LOG_INPUT.ToString());
  1716. }
  1717. if (span.Length < IKCP_OVERHEAD)
  1718. {
  1719. return -1;
  1720. }
  1721. uint prev_una = snd_una;
  1722. var offset = 0;
  1723. int flag = 0;
  1724. uint maxack = 0;
  1725. uint latest_ts = 0;
  1726. while (true)
  1727. {
  1728. uint ts = 0;
  1729. uint sn = 0;
  1730. uint length = 0;
  1731. uint una = 0;
  1732. uint conv_ = 0;
  1733. ushort wnd = 0;
  1734. byte cmd = 0;
  1735. byte frg = 0;
  1736. if (span.Length - offset < IKCP_OVERHEAD)
  1737. {
  1738. break;
  1739. }
  1740. Span<byte> header = stackalloc byte[24];
  1741. span.Slice(offset, 24).CopyTo(header);
  1742. offset += ReadHeader(header,
  1743. ref conv_,
  1744. ref cmd,
  1745. ref frg,
  1746. ref wnd,
  1747. ref ts,
  1748. ref sn,
  1749. ref una,
  1750. ref length);
  1751. if (conv != conv_)
  1752. {
  1753. return -1;
  1754. }
  1755. if (span.Length - offset < length || (int)length < 0)
  1756. {
  1757. return -2;
  1758. }
  1759. switch (cmd)
  1760. {
  1761. case IKCP_CMD_PUSH:
  1762. case IKCP_CMD_ACK:
  1763. case IKCP_CMD_WASK:
  1764. case IKCP_CMD_WINS:
  1765. break;
  1766. default:
  1767. return -3;
  1768. }
  1769. rmt_wnd = wnd;
  1770. Parse_una(una);
  1771. Shrink_buf();
  1772. if (IKCP_CMD_ACK == cmd)
  1773. {
  1774. if (Itimediff(current, ts) >= 0)
  1775. {
  1776. Update_ack(Itimediff(current, ts));
  1777. }
  1778. Parse_ack(sn);
  1779. Shrink_buf();
  1780. if (flag == 0)
  1781. {
  1782. flag = 1;
  1783. maxack = sn;
  1784. latest_ts = ts;
  1785. }
  1786. else if (Itimediff(sn, maxack) > 0)
  1787. {
  1788. #if !IKCP_FASTACK_CONSERVE
  1789. maxack = sn;
  1790. latest_ts = ts;
  1791. #else
  1792. if (Itimediff(ts, latest_ts) > 0)
  1793. {
  1794. maxack = sn;
  1795. latest_ts = ts;
  1796. }
  1797. #endif
  1798. }
  1799. if (CanLog(KcpLogMask.IKCP_LOG_IN_ACK))
  1800. {
  1801. LogWriteLine($"input ack: sn={sn} rtt={Itimediff(current, ts)} rto={rx_rto}",
  1802. KcpLogMask.IKCP_LOG_IN_ACK.ToString());
  1803. }
  1804. }
  1805. else if (IKCP_CMD_PUSH == cmd)
  1806. {
  1807. if (CanLog(KcpLogMask.IKCP_LOG_IN_DATA))
  1808. {
  1809. LogWriteLine($"input psh: sn={sn} ts={ts}", KcpLogMask.IKCP_LOG_IN_DATA.ToString());
  1810. }
  1811. if (Itimediff(sn, rcv_nxt + rcv_wnd) < 0)
  1812. {
  1813. ///instead of ikcp_ack_push
  1814. acklist.Enqueue((sn, ts));
  1815. if (Itimediff(sn, rcv_nxt) >= 0)
  1816. {
  1817. var seg = SegmentManager.Alloc((int)length);
  1818. seg.conv = conv_;
  1819. seg.cmd = cmd;
  1820. seg.frg = frg;
  1821. seg.wnd = wnd;
  1822. seg.ts = ts;
  1823. seg.sn = sn;
  1824. seg.una = una;
  1825. //seg.len = length; 长度在分配时确定,不能改变
  1826. if (length > 0)
  1827. {
  1828. span.Slice(offset, (int)length).CopyTo(seg.data);
  1829. }
  1830. Parse_data(seg);
  1831. }
  1832. }
  1833. }
  1834. else if (IKCP_CMD_WASK == cmd)
  1835. {
  1836. // ready to send back IKCP_CMD_WINS in Ikcp_flush
  1837. // tell remote my window size
  1838. probe |= IKCP_ASK_TELL;
  1839. if (CanLog(KcpLogMask.IKCP_LOG_IN_PROBE))
  1840. {
  1841. LogWriteLine($"input probe", KcpLogMask.IKCP_LOG_IN_PROBE.ToString());
  1842. }
  1843. }
  1844. else if (IKCP_CMD_WINS == cmd)
  1845. {
  1846. // do nothing
  1847. if (CanLog(KcpLogMask.IKCP_LOG_IN_WINS))
  1848. {
  1849. LogWriteLine($"input wins: {wnd}", KcpLogMask.IKCP_LOG_IN_WINS.ToString());
  1850. }
  1851. }
  1852. else
  1853. {
  1854. return -3;
  1855. }
  1856. offset += (int)length;
  1857. }
  1858. if (flag != 0)
  1859. {
  1860. Parse_fastack(maxack, latest_ts);
  1861. }
  1862. if (Itimediff(this.snd_una, prev_una) > 0)
  1863. {
  1864. if (cwnd < rmt_wnd)
  1865. {
  1866. if (cwnd < ssthresh)
  1867. {
  1868. cwnd++;
  1869. incr += mss;
  1870. }
  1871. else
  1872. {
  1873. if (incr < mss)
  1874. {
  1875. incr = mss;
  1876. }
  1877. incr += (mss * mss) / incr + (mss / 16);
  1878. if ((cwnd + 1) * mss <= incr)
  1879. {
  1880. #if true
  1881. cwnd = (incr + mss - 1) / ((mss > 0) ? mss : 1);
  1882. #else
  1883. cwnd++;
  1884. #endif
  1885. }
  1886. }
  1887. if (cwnd > rmt_wnd)
  1888. {
  1889. cwnd = rmt_wnd;
  1890. incr = rmt_wnd * mss;
  1891. }
  1892. }
  1893. }
  1894. return 0;
  1895. }
  1896. public static int ReadHeader(ReadOnlySpan<byte> header,
  1897. ref uint conv_,
  1898. ref byte cmd,
  1899. ref byte frg,
  1900. ref ushort wnd,
  1901. ref uint ts,
  1902. ref uint sn,
  1903. ref uint una,
  1904. ref uint length)
  1905. {
  1906. var offset = 0;
  1907. if (IsLittleEndian)
  1908. {
  1909. conv_ = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset));
  1910. offset += 4;
  1911. cmd = header[offset];
  1912. offset += 1;
  1913. frg = header[offset];
  1914. offset += 1;
  1915. wnd = BinaryPrimitives.ReadUInt16LittleEndian(header.Slice(offset));
  1916. offset += 2;
  1917. ts = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset));
  1918. offset += 4;
  1919. sn = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset));
  1920. offset += 4;
  1921. una = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset));
  1922. offset += 4;
  1923. length = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset));
  1924. offset += 4;
  1925. }
  1926. else
  1927. {
  1928. conv_ = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset));
  1929. offset += 4;
  1930. cmd = header[offset];
  1931. offset += 1;
  1932. frg = header[offset];
  1933. offset += 1;
  1934. wnd = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(offset));
  1935. offset += 2;
  1936. ts = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset));
  1937. offset += 4;
  1938. sn = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset));
  1939. offset += 4;
  1940. una = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset));
  1941. offset += 4;
  1942. length = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset));
  1943. offset += 4;
  1944. }
  1945. return offset;
  1946. }
  1947. }
  1948. }