using System.Buffers; using System.Buffers.Binary; using System.Collections.Concurrent; using System.Collections.Generic; using System.Runtime.CompilerServices; using Fort23.UTool; using static System.Math; using BufferOwner = System.Buffers.IMemoryOwner; namespace System.Net.Sockets.Kcp { public abstract class KcpConst { /// /// https://github.com/skywind3000/kcp/issues/53 /// 按照 C版 设计,使用小端字节序 /// public static bool IsLittleEndian = true; // 为了减少阅读难度,变量名尽量于 C版 统一 /* conv 会话ID mtu 最大传输单元 mss 最大分片大小 state 连接状态(0xFFFFFFFF表示断开连接) snd_una 第一个未确认的包 snd_nxt 待发送包的序号 rcv_nxt 待接收消息序号 ssthresh 拥塞窗口阈值 rx_rttvar ack接收rtt浮动值 rx_srtt ack接收rtt静态值 rx_rto 由ack接收延迟计算出来的复原时间 rx_minrto 最小复原时间 snd_wnd 发送窗口大小 rcv_wnd 接收窗口大小 rmt_wnd, 远端接收窗口大小 cwnd, 拥塞窗口大小 probe 探查变量,IKCP_ASK_TELL表示告知远端窗口大小。IKCP_ASK_SEND表示请求远端告知窗口大小 interval 内部flush刷新间隔 ts_flush 下次flush刷新时间戳 nodelay 是否启动无延迟模式 updated 是否调用过update函数的标识 ts_probe, 下次探查窗口的时间戳 probe_wait 探查窗口需要等待的时间 dead_link 最大重传次数 incr 可发送的最大数据量 fastresend 触发快速重传的重复ack个数 nocwnd 取消拥塞控制 stream 是否采用流传输模式 snd_queue 发送消息的队列 rcv_queue 接收消息的队列 snd_buf 发送消息的缓存 rcv_buf 接收消息的缓存 acklist 待发送的ack列表 buffer 存储消息字节流的内存 output udp发送消息的回调函数 */ #region Const public const int IKCP_RTO_NDL = 30; // no delay min rto public const int IKCP_RTO_MIN = 100; // normal min rto public const int IKCP_RTO_DEF = 200; public const int IKCP_RTO_MAX = 60000; /// /// 数据报文 /// public const int IKCP_CMD_PUSH = 81; // cmd: push data /// /// 确认报文 /// public const int IKCP_CMD_ACK = 82; // cmd: ack /// /// 窗口探测报文,询问对端剩余接收窗口的大小. /// public const int IKCP_CMD_WASK = 83; // cmd: window probe (ask) /// /// 窗口通知报文,通知对端剩余接收窗口的大小. /// public const int IKCP_CMD_WINS = 84; // cmd: window size (tell) /// /// IKCP_ASK_SEND表示请求远端告知窗口大小 /// public const int IKCP_ASK_SEND = 84; // need to send IKCP_CMD_WASK /// /// IKCP_ASK_TELL表示告知远端窗口大小。 /// public const int IKCP_ASK_TELL = 84; // need to send IKCP_CMD_WINS public const int IKCP_WND_SND = 256; /// /// 接收窗口默认值。必须大于最大分片数 /// public const int IKCP_WND_RCV = 512; // must >= max fragment size /// /// 默认最大传输单元 常见路由值 1492 1480 默认1400保证在路由层不会被分片 /// public const int IKCP_MTU_DEF = 1400; public const int IKCP_ACK_FAST = 3; public const int IKCP_INTERVAL = 5; public const int IKCP_OVERHEAD = 24; public const int IKCP_DEADLINK = 20; public const int IKCP_THRESH_INIT = 2; public const int IKCP_THRESH_MIN = 2; /// /// 窗口探查CD /// public const int IKCP_PROBE_INIT = 7000; // 7 secs to probe window size public const int IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window public const int IKCP_FASTACK_LIMIT = 5; // max times to trigger fastack #endregion } /// /// https://luyuhuang.tech/2020/12/09/kcp.html /// https://github.com/skywind3000/kcp/wiki/Network-Layer /// 外部buffer ----拆分拷贝----等待列表 -----移动----发送列表----拷贝----发送buffer---output /// https://github.com/skywind3000/kcp/issues/118#issuecomment-338133930 /// public partial class KcpCore : KcpConst, IKcpSetting, IKcpUpdate, IDisposable where Segment : IKcpSegment { internal protected IKcpCallback callbackHandle; internal protected IKcpOutputWriter OutputWriter; public KcpCore(uint conv_) { conv = conv_; snd_wnd = IKCP_WND_SND; rcv_wnd = IKCP_WND_RCV; rmt_wnd = IKCP_WND_RCV; mtu = IKCP_MTU_DEF; mss = mtu - IKCP_OVERHEAD; buffer = CreateBuffer(BufferNeedSize); rx_rto = IKCP_RTO_DEF; rx_minrto = IKCP_RTO_MIN; interval = IKCP_INTERVAL; ts_flush = IKCP_INTERVAL; ssthresh = IKCP_THRESH_INIT; fastlimit = IKCP_FASTACK_LIMIT; dead_link = IKCP_DEADLINK; } public ISegmentManager SegmentManager { get; set; } protected static uint Ibound(uint lower, uint middle, uint upper) { return Min(Max(lower, middle), upper); } protected static int Itimediff(uint later, uint earlier) { return ((int)(later - earlier)); } internal protected virtual BufferOwner CreateBuffer(int needSize) { return new KcpInnerBuffer(needSize); } internal protected class KcpInnerBuffer : BufferOwner { private readonly Memory _memory; bool alreadyDisposed = false; public KcpInnerBuffer(int size) { _memory = new Memory(new byte[size]); } public Memory Memory { get { if (alreadyDisposed) { throw new ObjectDisposedException(nameof(KcpInnerBuffer)); } return _memory; } } public void Dispose() { alreadyDisposed = true; } } #region kcp members /// /// 频道号 /// public uint conv { get; protected set; } /// /// 最大传输单元(Maximum Transmission Unit,MTU) /// protected uint mtu; /// /// 缓冲区最小大小 /// protected int BufferNeedSize { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { return (int)((mtu /* + IKCP_OVERHEAD*/) /** 3*/); } } /// /// 最大报文段长度 /// protected uint mss; /// /// 连接状态(0xFFFFFFFF表示断开连接) /// protected int state; /// /// 第一个未确认的包 /// protected uint snd_una; /// /// 待发送包的序号 /// protected uint snd_nxt; /// /// 下一个等待接收消息ID,待接收消息序号 /// protected uint rcv_nxt; protected uint ts_recent; protected uint ts_lastack; /// /// 拥塞窗口阈值 /// protected uint ssthresh; /// /// ack接收rtt浮动值 /// protected uint rx_rttval; /// /// ack接收rtt静态值 /// protected uint rx_srtt; /// /// 由ack接收延迟计算出来的复原时间。Retransmission TimeOut(RTO), 超时重传时间. /// protected uint rx_rto; /// /// 最小复原时间 /// protected uint rx_minrto; /// /// 发送窗口大小 /// protected uint snd_wnd; /// /// 接收窗口大小 /// protected uint rcv_wnd; /// /// 远端接收窗口大小 /// protected uint rmt_wnd; /// /// 拥塞窗口大小 /// protected uint cwnd; /// /// 探查变量,IKCP_ASK_TELL表示告知远端窗口大小。IKCP_ASK_SEND表示请求远端告知窗口大小 /// protected uint probe; protected uint current; /// /// 内部flush刷新间隔 /// protected uint interval; /// /// 下次flush刷新时间戳 /// protected uint ts_flush; protected uint xmit; /// /// 是否启动无延迟模式 /// protected uint nodelay; /// /// 是否调用过update函数的标识 /// protected uint updated; /// /// 下次探查窗口的时间戳 /// protected uint ts_probe; /// /// 探查窗口需要等待的时间 /// protected uint probe_wait; /// /// 最大重传次数 /// protected uint dead_link; /// /// 可发送的最大数据量 /// protected uint incr; /// /// 触发快速重传的重复ack个数 /// public int fastresend; public int fastlimit; /// /// 取消拥塞控制 /// protected int nocwnd; protected int logmask; /// /// 是否采用流传输模式 /// public int stream; protected BufferOwner buffer; #endregion #region 锁和容器 /// /// 增加锁保证发送线程安全,否则可能导致2个消息的分片交替入队。 /// 用例:普通发送和广播可能会导致多个线程同时调用Send方法。 /// protected readonly object snd_queueLock = new object(); protected readonly object snd_bufLock = new object(); protected readonly object rcv_bufLock = new object(); protected readonly object rcv_queueLock = new object(); /// /// 发送 ack 队列 /// protected ConcurrentQueue<(uint sn, uint ts)> acklist = new ConcurrentQueue<(uint sn, uint ts)>(); /// /// 发送等待队列 /// internal ConcurrentQueue snd_queue = new ConcurrentQueue(); /// /// 正在发送列表 /// internal LinkedList snd_buf = new LinkedList(); /// /// 正在等待触发接收回调函数消息列表 /// 需要执行的操作 添加 遍历 删除 /// internal List rcv_queue = new List(); /// /// 正在等待重组消息列表 /// 需要执行的操作 添加 插入 遍历 删除 /// internal LinkedList rcv_buf = new LinkedList(); /// /// get how many packet is waiting to be sent /// /// public int WaitSnd => snd_buf.Count + snd_queue.Count; #endregion #region IDisposable Support private bool disposedValue = false; // 要检测冗余调用 /// /// 是否正在释放 /// private bool m_disposing = false; protected bool CheckDispose() { if (m_disposing) { return true; } if (disposedValue) { throw new ObjectDisposedException( $"{nameof(Kcp)} [conv:{conv}]"); } return false; } protected virtual void Dispose(bool disposing) { try { m_disposing = true; if (!disposedValue) { if (disposing) { // 释放托管状态(托管对象)。 callbackHandle = null; acklist = null; buffer = null; } // 释放未托管的资源(未托管的对象)并在以下内容中替代终结器。 // 将大型字段设置为 null。 void FreeCollection(IEnumerable collection) { if (collection == null) { return; } foreach (var item in collection) { try { SegmentManager.Free(item); } catch (Exception) { //理论上此处不会有任何异常 LogFail($"此处绝不应该出现异常。 Dispose 时出现预计外异常,联系作者"); } } } lock (snd_queueLock) { while (snd_queue != null && (snd_queue.TryDequeue(out var segment) || !snd_queue.IsEmpty) ) { try { SegmentManager.Free(segment); } catch (Exception) { //理论上这里没有任何异常; } } snd_queue = null; } lock (snd_bufLock) { FreeCollection(snd_buf); snd_buf?.Clear(); snd_buf = null; } lock (rcv_bufLock) { FreeCollection(rcv_buf); rcv_buf?.Clear(); rcv_buf = null; } lock (rcv_queueLock) { FreeCollection(rcv_queue); rcv_queue?.Clear(); rcv_queue = null; } disposedValue = true; } } finally { m_disposing = false; } } // 仅当以上 Dispose(bool disposing) 拥有用于释放未托管资源的代码时才替代终结器。 ~KcpCore() { // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。 Dispose(false); } // 添加此代码以正确实现可处置模式。 /// /// 释放不是严格线程安全的,尽量使用和Update相同的线程调用, /// 或者等待析构时自动释放。 /// public void Dispose() { // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。 Dispose(true); // 如果在以上内容中替代了终结器,则取消注释以下行。 GC.SuppressFinalize(this); } #endregion #region 功能逻辑 //功能函数 /// /// Determine when should you invoke ikcp_update: /// returns when you should invoke ikcp_update in millisec, if there /// is no ikcp_input/_send calling. you can call ikcp_update in that /// time, instead of call update repeatly. /// /// Important to reduce unnacessary ikcp_update invoking. use it to /// schedule ikcp_update (eg. implementing an epoll-like mechanism, /// or optimize ikcp_update when handling massive kcp connections) /// /// /// /// public DateTimeOffset Check(in DateTimeOffset time) { if (CheckDispose()) { //检查释放 return default; } if (updated == 0) { return time; } var current_ = time.ConvertTime(); var ts_flush_ = ts_flush; var tm_flush_ = 0x7fffffff; var tm_packet = 0x7fffffff; var minimal = 0; if (Itimediff(current_, ts_flush_) >= 10000 || Itimediff(current_, ts_flush_) < -10000) { ts_flush_ = current_; } if (Itimediff(current_, ts_flush_) >= 0) { return time; } tm_flush_ = Itimediff(ts_flush_, current_); lock (snd_bufLock) { foreach (var seg in snd_buf) { var diff = Itimediff(seg.resendts, current_); if (diff <= 0) { return time; } if (diff < tm_packet) { tm_packet = diff; } } } minimal = tm_packet < tm_flush_ ? tm_packet : tm_flush_; if (minimal >= interval) minimal = (int)interval; return time + TimeSpan.FromMilliseconds(minimal); } /// /// move available data from rcv_buf -> rcv_queue /// protected void Move_Rcv_buf_2_Rcv_queue() { lock (rcv_bufLock) { while (rcv_buf.Count > 0) { var seg = rcv_buf.First.Value; if (seg.sn == rcv_nxt && rcv_queue.Count < rcv_wnd) { rcv_buf.RemoveFirst(); lock (rcv_queueLock) { rcv_queue.Add(seg); } rcv_nxt++; } else { break; } } } } /// /// update ack. /// /// protected void Update_ack(int rtt) { if (rx_srtt == 0) { rx_srtt = (uint)rtt; rx_rttval = (uint)rtt / 2; } else { int delta = (int)((uint)rtt - rx_srtt); if (delta < 0) { delta = -delta; } rx_rttval = (3 * rx_rttval + (uint)delta) / 4; rx_srtt = (uint)((7 * rx_srtt + rtt) / 8); if (rx_srtt < 1) { rx_srtt = 1; } } var rto = rx_srtt + Max(interval, 4 * rx_rttval); rx_rto = Ibound(rx_minrto, rto, IKCP_RTO_MAX); } protected void Shrink_buf() { lock (snd_bufLock) { snd_una = snd_buf.Count > 0 ? snd_buf.First.Value.sn : snd_nxt; } } protected void Parse_ack(uint sn) { if (Itimediff(sn, snd_una) < 0 || Itimediff(sn, snd_nxt) >= 0) { return; } lock (snd_bufLock) { for (var p = snd_buf.First; p != null; p = p.Next) { var seg = p.Value; if (sn == seg.sn) { snd_buf.Remove(p); SegmentManager.Free(seg); break; } if (Itimediff(sn, seg.sn) < 0) { break; } } } } protected void Parse_una(uint una) { /// 删除给定时间之前的片段。保留之后的片段 lock (snd_bufLock) { while (snd_buf.First != null) { var seg = snd_buf.First.Value; if (Itimediff(una, seg.sn) > 0) { snd_buf.RemoveFirst(); SegmentManager.Free(seg); } else { break; } } } } protected void Parse_fastack(uint sn, uint ts) { if (Itimediff(sn, snd_una) < 0 || Itimediff(sn, snd_nxt) >= 0) { return; } lock (snd_bufLock) { foreach (var item in snd_buf) { var seg = item; if (Itimediff(sn, seg.sn) < 0) { break; } else if (sn != seg.sn) { #if !IKCP_FASTACK_CONSERVE seg.fastack++; #else if (Itimediff(ts, seg.ts) >= 0) { seg.fastack++; } #endif } } } } /// /// 处理下层网络收到的数据包 /// /// internal virtual void Parse_data(Segment newseg) { var sn = newseg.sn; lock (rcv_bufLock) { if (Itimediff(sn, rcv_nxt + rcv_wnd) >= 0 || Itimediff(sn, rcv_nxt) < 0) { // 如果接收到数据报文的编号大于 rcv_nxt + rcv_wnd 或小于 rcv_nxt, 这个报文就会被丢弃. SegmentManager.Free(newseg); return; } var repeat = false; ///检查是否重复消息和插入位置 LinkedListNode p; for (p = rcv_buf.Last; p != null; p = p.Previous) { var seg = p.Value; if (seg.sn == sn) { repeat = true; break; } if (Itimediff(sn, seg.sn) > 0) { break; } } if (!repeat) { if (CanLog(KcpLogMask.IKCP_LOG_PARSE_DATA)) { LogWriteLine($"{newseg.ToLogString()}", KcpLogMask.IKCP_LOG_PARSE_DATA.ToString()); } if (p == null) { rcv_buf.AddFirst(newseg); if (newseg.frg + 1 > rcv_wnd) { //分片数大于接收窗口,造成kcp阻塞冻结。 //Console.WriteLine($"分片数大于接收窗口,造成kcp阻塞冻结。frgCount:{newseg.frg + 1} rcv_wnd:{rcv_wnd}"); //百分之百阻塞冻结,打印日志没有必要。直接抛出异常。 throw new NotSupportedException( $"分片数大于接收窗口,造成kcp阻塞冻结。frgCount:{newseg.frg + 1} rcv_wnd:{rcv_wnd}"); } } else { rcv_buf.AddAfter(p, newseg); } } else { SegmentManager.Free(newseg); } } Move_Rcv_buf_2_Rcv_queue(); } protected ushort Wnd_unused() { ///此处没有加锁,所以不要内联变量,否则可能导致 判断变量和赋值变量不一致 int waitCount = rcv_queue.Count; if (waitCount < rcv_wnd) { //Q:为什么要减去nrcv_que,rcv_queue中已经排好序了,还要算在接收窗口内,感觉不能理解? //现在问题是如果一个超大数据包,分片数大于rcv_wnd接收窗口,会导致rcv_wnd持续为0,阻塞整个流程。 //个人理解,rcv_queue中的数据是已经确认的数据,无论用户是否recv,都不应该影响收发。 //A:现在在发送出加一个分片数检测,过大直接抛出异常。防止阻塞发送。 //在接收端也加一个检测,如果(frg+1)分片数 > rcv_wnd,也要抛出一个异常或者警告。至少有个提示。 /// fix https://github.com/skywind3000/kcp/issues/126 /// 实际上 rcv_wnd 不应该大于65535 var count = rcv_wnd - waitCount; return (ushort)Min(count, ushort.MaxValue); } return 0; } /// /// flush pending data /// protected void Flush() { var current_ = current; var buffer_ = buffer; var change = 0; var lost = 0; var offset = 0; if (updated == 0) { return; } ushort wnd_ = Wnd_unused(); unsafe { ///在栈上分配这个segment,这个segment随用随销毁,不会被保存 const int len = KcpSegment.LocalOffset + KcpSegment.HeadOffset; var ptr = stackalloc byte[len]; KcpSegment seg = new KcpSegment(ptr, 0); //seg = KcpSegment.AllocHGlobal(0); seg.conv = conv; seg.cmd = IKCP_CMD_ACK; //seg.frg = 0; seg.wnd = wnd_; seg.una = rcv_nxt; //seg.len = 0; //seg.sn = 0; //seg.ts = 0; #region flush acknowledges if (CheckDispose()) { //检查释放 return; } while (acklist.TryDequeue(out var temp)) { if (offset + IKCP_OVERHEAD > mtu) { callbackHandle.Output(buffer, offset); offset = 0; buffer = CreateBuffer(BufferNeedSize); //IKcpOutputer outputer = null; //var span = outputer.GetSpan(offset); //buffer.Memory.Span.Slice(0, offset).CopyTo(span); //outputer.Advance(offset); //outputer.Flush(); } seg.sn = temp.sn; seg.ts = temp.ts; offset += seg.Encode(buffer.Memory.Span.Slice(offset)); } #endregion #region probe window size (if remote window size equals zero) // probe window size (if remote window size equals zero) if (rmt_wnd == 0) { if (probe_wait == 0) { probe_wait = IKCP_PROBE_INIT; ts_probe = current + probe_wait; } else { if (Itimediff(current, ts_probe) >= 0) { if (probe_wait < IKCP_PROBE_INIT) { probe_wait = IKCP_PROBE_INIT; } probe_wait += probe_wait / 2; if (probe_wait > IKCP_PROBE_LIMIT) { probe_wait = IKCP_PROBE_LIMIT; } ts_probe = current + probe_wait; probe |= IKCP_ASK_SEND; } } } else { ts_probe = 0; probe_wait = 0; } #endregion #region flush window probing commands // flush window probing commands if ((probe & IKCP_ASK_SEND) != 0) { seg.cmd = IKCP_CMD_WASK; if (offset + IKCP_OVERHEAD > (int)mtu) { callbackHandle.Output(buffer, offset); offset = 0; buffer = CreateBuffer(BufferNeedSize); } offset += seg.Encode(buffer.Memory.Span.Slice(offset)); } if ((probe & IKCP_ASK_TELL) != 0) { seg.cmd = IKCP_CMD_WINS; if (offset + IKCP_OVERHEAD > (int)mtu) { callbackHandle.Output(buffer, offset); offset = 0; buffer = CreateBuffer(BufferNeedSize); } offset += seg.Encode(buffer.Memory.Span.Slice(offset)); } probe = 0; #endregion } #region 刷新,将发送等待列表移动到发送列表 // calculate window size var cwnd_ = Min(snd_wnd, rmt_wnd); if (nocwnd == 0) { cwnd_ = Min(cwnd, cwnd_); } while (Itimediff(snd_nxt, snd_una + cwnd_) < 0) { if (snd_queue.TryDequeue(out var newseg)) { newseg.conv = conv; newseg.cmd = IKCP_CMD_PUSH; newseg.wnd = wnd_; newseg.ts = current_; newseg.sn = snd_nxt; snd_nxt++; newseg.una = rcv_nxt; newseg.resendts = current_; newseg.rto = rx_rto; newseg.fastack = 0; newseg.xmit = 0; lock (snd_bufLock) { snd_buf.AddLast(newseg); } } else { break; } } #endregion #region 刷新 发送列表,调用Output // calculate resent var resent = fastresend > 0 ? (uint)fastresend : 0xffffffff; var rtomin = nodelay == 0 ? (rx_rto >> 3) : 0; lock (snd_bufLock) { // flush data segments // if (snd_buf.Count > 0) // { // LogTool.Log($"准备发送包数量{cwnd_} {snd_buf.Count} {snd_queue.Count}"); // } foreach (var item in snd_buf) { var segment = item; var needsend = false; var debug = Itimediff(current_, segment.resendts); if (segment.xmit == 0) { //新加入 snd_buf 中, 从未发送过的报文直接发送出去; needsend = true; segment.xmit++; segment.rto = rx_rto; // LogTool.Log($"新发送包{segment.sn} 大小{segment.len}" ); segment.resendts = current_ + rx_rto + rtomin; } else if (Itimediff(current_, segment.resendts) >= 0) { //发送过的, 但是在 RTO 内未收到 ACK 的报文, 需要重传; needsend = true; segment.xmit++; this.xmit++; if (nodelay == 0) { segment.rto += Math.Max(segment.rto, rx_rto); } else { var step = nodelay < 2 ? segment.rto : rx_rto; segment.rto += step / 2; } // LogTool.Log($"重送包{segment.sn} 大小{segment.len} 次数{segment.xmit}"); segment.resendts = current_ + segment.rto; lost = 1; } else if (segment.fastack >= resent) { //发送过的, 但是 ACK 失序若干次的报文, 需要执行快速重传. // LogTool.Log($"快速重传{segment.sn} 大小{segment.len} 次数{segment.xmit}"); if (segment.xmit <= fastlimit || fastlimit <= 0) { needsend = true; segment.xmit++; segment.fastack = 0; segment.resendts = current_ + segment.rto; change++; } } if (needsend) { segment.ts = current_; segment.wnd = wnd_; segment.una = rcv_nxt; var need = IKCP_OVERHEAD + segment.len; if (offset + need > mtu) { callbackHandle.Output(buffer, offset); offset = 0; buffer = CreateBuffer(BufferNeedSize); } offset += segment.Encode(buffer.Memory.Span.Slice(offset)); if (CanLog(KcpLogMask.IKCP_LOG_NEED_SEND)) { LogWriteLine($"{segment.ToLogString(true)}", KcpLogMask.IKCP_LOG_NEED_SEND.ToString()); } if (segment.xmit >= dead_link) { state = -1; if (CanLog(KcpLogMask.IKCP_LOG_DEAD_LINK)) { LogWriteLine($"state = -1; xmit:{segment.xmit} >= dead_link:{dead_link}", KcpLogMask.IKCP_LOG_DEAD_LINK.ToString()); } } } } } // flash remain segments if (offset > 0) { callbackHandle.Output(buffer, offset); offset = 0; buffer = CreateBuffer(BufferNeedSize); } #endregion #region update ssthresh // update ssthresh 根据丢包情况计算 ssthresh 和 cwnd. if (change != 0) { var inflight = snd_nxt - snd_una; ssthresh = inflight / 2; if (ssthresh < IKCP_THRESH_MIN) { ssthresh = IKCP_THRESH_MIN; } cwnd = ssthresh + resent; incr = cwnd * mss; } if (lost != 0) { ssthresh = cwnd / 2; if (ssthresh < IKCP_THRESH_MIN) { ssthresh = IKCP_THRESH_MIN; } cwnd = 1; incr = mss; } if (cwnd < 1) { cwnd = 1; incr = mss; } #endregion if (state == -1) { OnDeadlink(); } } protected virtual void OnDeadlink() { } /// /// Test OutputWriter /// protected void Flush2() { var current_ = current; var change = 0; var lost = 0; if (updated == 0) { return; } ushort wnd_ = Wnd_unused(); unsafe { ///在栈上分配这个segment,这个segment随用随销毁,不会被保存 const int len = KcpSegment.LocalOffset + KcpSegment.HeadOffset; var ptr = stackalloc byte[len]; KcpSegment seg = new KcpSegment(ptr, 0); //seg = KcpSegment.AllocHGlobal(0); seg.conv = conv; seg.cmd = IKCP_CMD_ACK; //seg.frg = 0; seg.wnd = wnd_; seg.una = rcv_nxt; //seg.len = 0; //seg.sn = 0; //seg.ts = 0; #region flush acknowledges if (CheckDispose()) { //检查释放 return; } while (acklist.TryDequeue(out var temp)) { if (OutputWriter.UnflushedBytes + IKCP_OVERHEAD > mtu) { OutputWriter.Flush(); } seg.sn = temp.sn; seg.ts = temp.ts; seg.Encode(OutputWriter); } #endregion #region probe window size (if remote window size equals zero) // probe window size (if remote window size equals zero) if (rmt_wnd == 0) { if (probe_wait == 0) { probe_wait = IKCP_PROBE_INIT; ts_probe = current + probe_wait; } else { if (Itimediff(current, ts_probe) >= 0) { if (probe_wait < IKCP_PROBE_INIT) { probe_wait = IKCP_PROBE_INIT; } probe_wait += probe_wait / 2; if (probe_wait > IKCP_PROBE_LIMIT) { probe_wait = IKCP_PROBE_LIMIT; } ts_probe = current + probe_wait; probe |= IKCP_ASK_SEND; } } } else { ts_probe = 0; probe_wait = 0; } #endregion #region flush window probing commands // flush window probing commands if ((probe & IKCP_ASK_SEND) != 0) { seg.cmd = IKCP_CMD_WASK; if (OutputWriter.UnflushedBytes + IKCP_OVERHEAD > (int)mtu) { OutputWriter.Flush(); } seg.Encode(OutputWriter); } if ((probe & IKCP_ASK_TELL) != 0) { seg.cmd = IKCP_CMD_WINS; if (OutputWriter.UnflushedBytes + IKCP_OVERHEAD > (int)mtu) { OutputWriter.Flush(); } seg.Encode(OutputWriter); } probe = 0; #endregion } #region 刷新,将发送等待列表移动到发送列表 // calculate window size var cwnd_ = Min(snd_wnd, rmt_wnd); if (nocwnd == 0) { cwnd_ = Min(cwnd, cwnd_); } while (Itimediff(snd_nxt, snd_una + cwnd_) < 0) { if (snd_queue.TryDequeue(out var newseg)) { newseg.conv = conv; newseg.cmd = IKCP_CMD_PUSH; newseg.wnd = wnd_; newseg.ts = current_; newseg.sn = snd_nxt; snd_nxt++; newseg.una = rcv_nxt; newseg.resendts = current_; newseg.rto = rx_rto; newseg.fastack = 0; newseg.xmit = 0; lock (snd_bufLock) { snd_buf.AddLast(newseg); } } else { break; } } #endregion #region 刷新 发送列表,调用Output // calculate resent var resent = fastresend > 0 ? (uint)fastresend : 0xffffffff; var rtomin = nodelay == 0 ? (rx_rto >> 3) : 0; lock (snd_bufLock) { // flush data segments foreach (var item in snd_buf) { var segment = item; var needsend = false; var debug = Itimediff(current_, segment.resendts); if (segment.xmit == 0) { //新加入 snd_buf 中, 从未发送过的报文直接发送出去; needsend = true; segment.xmit++; segment.rto = rx_rto; segment.resendts = current_ + rx_rto + rtomin; } else if (Itimediff(current_, segment.resendts) >= 0) { //发送过的, 但是在 RTO 内未收到 ACK 的报文, 需要重传; needsend = true; segment.xmit++; this.xmit++; if (nodelay == 0) { segment.rto += Math.Max(segment.rto, rx_rto); } else { var step = nodelay < 2 ? segment.rto : rx_rto; segment.rto += step / 2; } segment.resendts = current_ + segment.rto; lost = 1; } else if (segment.fastack >= resent) { //发送过的, 但是 ACK 失序若干次的报文, 需要执行快速重传. if (segment.xmit <= fastlimit || fastlimit <= 0) { needsend = true; segment.xmit++; segment.fastack = 0; segment.resendts = current_ + segment.rto; change++; } } if (needsend) { segment.ts = current_; segment.wnd = wnd_; segment.una = rcv_nxt; var need = IKCP_OVERHEAD + segment.len; if (OutputWriter.UnflushedBytes + need > mtu) { OutputWriter.Flush(); } segment.Encode(OutputWriter); if (CanLog(KcpLogMask.IKCP_LOG_NEED_SEND)) { LogWriteLine($"{segment.ToLogString(true)}", KcpLogMask.IKCP_LOG_NEED_SEND.ToString()); } if (segment.xmit >= dead_link) { state = -1; if (CanLog(KcpLogMask.IKCP_LOG_DEAD_LINK)) { LogWriteLine($"state = -1; xmit:{segment.xmit} >= dead_link:{dead_link}", KcpLogMask.IKCP_LOG_DEAD_LINK.ToString()); } } } } } // flash remain segments if (OutputWriter.UnflushedBytes > 0) { OutputWriter.Flush(); } #endregion #region update ssthresh // update ssthresh 根据丢包情况计算 ssthresh 和 cwnd. if (change != 0) { var inflight = snd_nxt - snd_una; ssthresh = inflight / 2; if (ssthresh < IKCP_THRESH_MIN) { ssthresh = IKCP_THRESH_MIN; } cwnd = ssthresh + resent; incr = cwnd * mss; } if (lost != 0) { ssthresh = cwnd / 2; if (ssthresh < IKCP_THRESH_MIN) { ssthresh = IKCP_THRESH_MIN; } cwnd = 1; incr = mss; } if (cwnd < 1) { cwnd = 1; incr = mss; } #endregion if (state == -1) { OnDeadlink(); } } /// /// update state (call it repeatedly, every 10ms-100ms), or you can ask /// ikcp_check when to call it again (without ikcp_input/_send calling). /// /// DateTime.UtcNow public void Update(in DateTimeOffset time) { if (CheckDispose()) { //检查释放 return; } current = time.ConvertTime(); if (updated == 0) { updated = 1; ts_flush = current; } var slap = Itimediff(current, ts_flush); if (slap >= 10000 || slap < -10000) { ts_flush = current; slap = 0; } if (slap >= 0) { ts_flush += interval; if (Itimediff(current, ts_flush) >= 0) { ts_flush = current + interval; } Flush(); } } #endregion #region 设置控制 public int SetMtu(int mtu = IKCP_MTU_DEF) { if (mtu < 50 || mtu < IKCP_OVERHEAD) { return -1; } var buffer_ = CreateBuffer(BufferNeedSize); if (null == buffer_) { return -2; } this.mtu = (uint)mtu; mss = this.mtu - IKCP_OVERHEAD; buffer.Dispose(); buffer = buffer_; return 0; } /// /// /// /// /// public int Interval(int interval_) { if (interval_ > 5000) { interval_ = 5000; } else if (interval_ < 0) { /// 将最小值 10 改为 0; ///在特殊形况下允许CPU满负荷运转; interval_ = 0; } interval = (uint)interval_; return 0; } public int NoDelay(int nodelay_, int interval_, int resend_, int nc_) { if (nodelay_ > 0) { nodelay = (uint)nodelay_; if (nodelay_ != 0) { rx_minrto = IKCP_RTO_NDL; } else { rx_minrto = IKCP_RTO_MIN; } } if (resend_ >= 0) { fastresend = resend_; } if (nc_ >= 0) { nocwnd = nc_; } return Interval(interval_); } public int WndSize(int sndwnd = IKCP_WND_SND, int rcvwnd = IKCP_WND_RCV) { if (sndwnd > 0) { snd_wnd = (uint)sndwnd; } if (rcvwnd > 0) { rcv_wnd = (uint)rcvwnd; } return 0; } #endregion } public partial class KcpCore : IKcpSendable { /// /// user/upper level send, returns below zero for error /// /// /// /// public int Send(ReadOnlySpan span, object options = null) { if (CheckDispose()) { //检查释放 return -4; } if (mss <= 0) { throw new InvalidOperationException($" mss <= 0 "); } if (span.Length == 0) { return -1; } var offset = 0; int count; #region append to previous segment in streaming mode (if possible) /// 基于线程安全和数据结构的等原因,移除了追加数据到最后一个包行为。 #endregion #region fragment if (span.Length <= mss) { count = 1; } else { count = (int)(span.Length + mss - 1) / (int)mss; } if (count > IKCP_WND_RCV) { return -2; } if (count == 0) { count = 1; } lock (snd_queueLock) { for (var i = 0; i < count; i++) { int size; if (span.Length - offset > mss) { size = (int)mss; } else { size = (int)span.Length - offset; } var seg = SegmentManager.Alloc(size); span.Slice(offset, size).CopyTo(seg.data); offset += size; seg.frg = (byte)(count - i - 1); snd_queue.Enqueue(seg); } } #endregion return 0; } //public int Send(Span span) //{ // return Send((ReadOnlySpan)span); //} public int Send(ReadOnlySequence span, object options = null) { if (CheckDispose()) { //检查释放 return -4; } if (mss <= 0) { throw new InvalidOperationException($" mss <= 0 "); } if (span.Length == 0) { return -1; } var offset = 0; int count; #region append to previous segment in streaming mode (if possible) /// 基于线程安全和数据结构的等原因,移除了追加数据到最后一个包行为。 #endregion #region fragment if (span.Length <= mss) { count = 1; } else { count = (int)(span.Length + mss - 1) / (int)mss; } if (count > IKCP_WND_RCV) { return -2; } if (count == 0) { count = 1; } lock (snd_queueLock) { for (var i = 0; i < count; i++) { int size; if (span.Length - offset > mss) { size = (int)mss; } else { size = (int)span.Length - offset; } var seg = SegmentManager.Alloc(size); span.Slice(offset, size).CopyTo(seg.data); offset += size; seg.frg = (byte)(count - i - 1); snd_queue.Enqueue(seg); } } #endregion return 0; } } public partial class KcpCore : IKcpInputable { /// /// when you received a low level packet (eg. UDP packet), call it /// /// /// public int Input(ReadOnlySpan span) { if (CheckDispose()) { //检查释放 return -4; } if (CanLog(KcpLogMask.IKCP_LOG_INPUT)) { LogWriteLine($"[RI] {span.Length} bytes", KcpLogMask.IKCP_LOG_INPUT.ToString()); } if (span.Length < IKCP_OVERHEAD) { return -1; } uint prev_una = snd_una; var offset = 0; int flag = 0; uint maxack = 0; uint latest_ts = 0; while (true) { uint ts = 0; uint sn = 0; uint length = 0; uint una = 0; uint conv_ = 0; ushort wnd = 0; byte cmd = 0; byte frg = 0; if (span.Length - offset < IKCP_OVERHEAD) { break; } Span header = stackalloc byte[24]; span.Slice(offset, 24).CopyTo(header); offset += ReadHeader(header, ref conv_, ref cmd, ref frg, ref wnd, ref ts, ref sn, ref una, ref length); if (conv != conv_) { return -1; } if (span.Length - offset < length || (int)length < 0) { return -2; } switch (cmd) { case IKCP_CMD_PUSH: case IKCP_CMD_ACK: case IKCP_CMD_WASK: case IKCP_CMD_WINS: break; default: return -3; } rmt_wnd = wnd; Parse_una(una); Shrink_buf(); if (IKCP_CMD_ACK == cmd) { if (Itimediff(current, ts) >= 0) { Update_ack(Itimediff(current, ts)); } Parse_ack(sn); Shrink_buf(); if (flag == 0) { flag = 1; maxack = sn; latest_ts = ts; } else if (Itimediff(sn, maxack) > 0) { #if !IKCP_FASTACK_CONSERVE maxack = sn; latest_ts = ts; #else if (Itimediff(ts, latest_ts) > 0) { maxack = sn; latest_ts = ts; } #endif } if (CanLog(KcpLogMask.IKCP_LOG_IN_ACK)) { LogWriteLine($"input ack: sn={sn} rtt={Itimediff(current, ts)} rto={rx_rto}", KcpLogMask.IKCP_LOG_IN_ACK.ToString()); } } else if (IKCP_CMD_PUSH == cmd) { if (CanLog(KcpLogMask.IKCP_LOG_IN_DATA)) { LogWriteLine($"input psh: sn={sn} ts={ts}", KcpLogMask.IKCP_LOG_IN_DATA.ToString()); } if (Itimediff(sn, rcv_nxt + rcv_wnd) < 0) { ///instead of ikcp_ack_push acklist.Enqueue((sn, ts)); if (Itimediff(sn, rcv_nxt) >= 0) { var seg = SegmentManager.Alloc((int)length); seg.conv = conv_; seg.cmd = cmd; seg.frg = frg; seg.wnd = wnd; seg.ts = ts; seg.sn = sn; seg.una = una; //seg.len = length; 长度在分配时确定,不能改变 if (length > 0) { span.Slice(offset, (int)length).CopyTo(seg.data); } // LogTool.Log($"收到包{sn} {rcv_nxt}"); Parse_data(seg); } } } else if (IKCP_CMD_WASK == cmd) { // ready to send back IKCP_CMD_WINS in Ikcp_flush // tell remote my window size probe |= IKCP_ASK_TELL; if (CanLog(KcpLogMask.IKCP_LOG_IN_PROBE)) { LogWriteLine($"input probe", KcpLogMask.IKCP_LOG_IN_PROBE.ToString()); } } else if (IKCP_CMD_WINS == cmd) { // do nothing if (CanLog(KcpLogMask.IKCP_LOG_IN_WINS)) { LogWriteLine($"input wins: {wnd}", KcpLogMask.IKCP_LOG_IN_WINS.ToString()); } } else { return -3; } offset += (int)length; } if (flag != 0) { Parse_fastack(maxack, latest_ts); } if (Itimediff(this.snd_una, prev_una) > 0) { if (cwnd < rmt_wnd) { if (cwnd < ssthresh) { cwnd++; incr += mss; } else { if (incr < mss) { incr = mss; } incr += (mss * mss) / incr + (mss / 16); if ((cwnd + 1) * mss <= incr) { #if true cwnd = (incr + mss - 1) / ((mss > 0) ? mss : 1); #else cwnd++; #endif } } if (cwnd > rmt_wnd) { cwnd = rmt_wnd; incr = rmt_wnd * mss; } } } return 0; } /// /// /// /// /// public int Input(ReadOnlySequence span) { if (CheckDispose()) { //检查释放 return -4; } if (CanLog(KcpLogMask.IKCP_LOG_INPUT)) { LogWriteLine($"[RI] {span.Length} bytes", KcpLogMask.IKCP_LOG_INPUT.ToString()); } if (span.Length < IKCP_OVERHEAD) { return -1; } uint prev_una = snd_una; var offset = 0; int flag = 0; uint maxack = 0; uint latest_ts = 0; while (true) { uint ts = 0; uint sn = 0; uint length = 0; uint una = 0; uint conv_ = 0; ushort wnd = 0; byte cmd = 0; byte frg = 0; if (span.Length - offset < IKCP_OVERHEAD) { break; } Span header = stackalloc byte[24]; span.Slice(offset, 24).CopyTo(header); offset += ReadHeader(header, ref conv_, ref cmd, ref frg, ref wnd, ref ts, ref sn, ref una, ref length); if (conv != conv_) { return -1; } if (span.Length - offset < length || (int)length < 0) { return -2; } switch (cmd) { case IKCP_CMD_PUSH: case IKCP_CMD_ACK: case IKCP_CMD_WASK: case IKCP_CMD_WINS: break; default: return -3; } rmt_wnd = wnd; Parse_una(una); Shrink_buf(); if (IKCP_CMD_ACK == cmd) { if (Itimediff(current, ts) >= 0) { Update_ack(Itimediff(current, ts)); } Parse_ack(sn); Shrink_buf(); if (flag == 0) { flag = 1; maxack = sn; latest_ts = ts; } else if (Itimediff(sn, maxack) > 0) { #if !IKCP_FASTACK_CONSERVE maxack = sn; latest_ts = ts; #else if (Itimediff(ts, latest_ts) > 0) { maxack = sn; latest_ts = ts; } #endif } if (CanLog(KcpLogMask.IKCP_LOG_IN_ACK)) { LogWriteLine($"input ack: sn={sn} rtt={Itimediff(current, ts)} rto={rx_rto}", KcpLogMask.IKCP_LOG_IN_ACK.ToString()); } } else if (IKCP_CMD_PUSH == cmd) { if (CanLog(KcpLogMask.IKCP_LOG_IN_DATA)) { LogWriteLine($"input psh: sn={sn} ts={ts}", KcpLogMask.IKCP_LOG_IN_DATA.ToString()); } if (Itimediff(sn, rcv_nxt + rcv_wnd) < 0) { ///instead of ikcp_ack_push acklist.Enqueue((sn, ts)); if (Itimediff(sn, rcv_nxt) >= 0) { var seg = SegmentManager.Alloc((int)length); seg.conv = conv_; seg.cmd = cmd; seg.frg = frg; seg.wnd = wnd; seg.ts = ts; seg.sn = sn; seg.una = una; //seg.len = length; 长度在分配时确定,不能改变 if (length > 0) { span.Slice(offset, (int)length).CopyTo(seg.data); } Parse_data(seg); } } } else if (IKCP_CMD_WASK == cmd) { // ready to send back IKCP_CMD_WINS in Ikcp_flush // tell remote my window size probe |= IKCP_ASK_TELL; if (CanLog(KcpLogMask.IKCP_LOG_IN_PROBE)) { LogWriteLine($"input probe", KcpLogMask.IKCP_LOG_IN_PROBE.ToString()); } } else if (IKCP_CMD_WINS == cmd) { // do nothing if (CanLog(KcpLogMask.IKCP_LOG_IN_WINS)) { LogWriteLine($"input wins: {wnd}", KcpLogMask.IKCP_LOG_IN_WINS.ToString()); } } else { return -3; } offset += (int)length; } if (flag != 0) { Parse_fastack(maxack, latest_ts); } if (Itimediff(this.snd_una, prev_una) > 0) { if (cwnd < rmt_wnd) { if (cwnd < ssthresh) { cwnd++; incr += mss; } else { if (incr < mss) { incr = mss; } incr += (mss * mss) / incr + (mss / 16); if ((cwnd + 1) * mss <= incr) { #if true cwnd = (incr + mss - 1) / ((mss > 0) ? mss : 1); #else cwnd++; #endif } } if (cwnd > rmt_wnd) { cwnd = rmt_wnd; incr = rmt_wnd * mss; } } } return 0; } public static int ReadHeader(ReadOnlySpan header, ref uint conv_, ref byte cmd, ref byte frg, ref ushort wnd, ref uint ts, ref uint sn, ref uint una, ref uint length) { var offset = 0; if (IsLittleEndian) { conv_ = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset)); offset += 4; cmd = header[offset]; offset += 1; frg = header[offset]; offset += 1; wnd = BinaryPrimitives.ReadUInt16LittleEndian(header.Slice(offset)); offset += 2; ts = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset)); offset += 4; sn = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset)); offset += 4; una = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset)); offset += 4; length = BinaryPrimitives.ReadUInt32LittleEndian(header.Slice(offset)); offset += 4; } else { conv_ = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset)); offset += 4; cmd = header[offset]; offset += 1; frg = header[offset]; offset += 1; wnd = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(offset)); offset += 2; ts = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset)); offset += 4; sn = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset)); offset += 4; una = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset)); offset += 4; length = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(offset)); offset += 4; } return offset; } } }