KCPClient.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. using System;
  2. using System.Buffers;
  3. using System.IO;
  4. using System.IO.Compression;
  5. using System.Net;
  6. using System.Net.Sockets;
  7. using System.Net.Sockets.Kcp;
  8. using System.Threading;
  9. using Fort23.Core;
  10. using Fort23.UTool;
  11. namespace Core.KCPTool
  12. {
  13. public class KCPClient : IKcpCallback, IDisposable, IClientConnection
  14. {
  15. private Socket socket;
  16. private Thread _udpClientThread;
  17. public bool IsConnection { get; set; }
  18. public bool disconnect { get; set; }
  19. public long playerId;
  20. private byte[] buffer = new byte[2048];
  21. private byte[] sendBuffer = new byte[2048];
  22. private Thread _updateThread;
  23. private Thread _receiveThread;
  24. private IClient client;
  25. /// <summary>
  26. /// 上次心跳时间
  27. /// </summary>
  28. private long lasetGetHandshakeTime = 0;
  29. public SimpleSegManager.Kcp kcp
  30. {
  31. get { return _kcp; }
  32. }
  33. public IPEndPoint EndPoint { get; set; }
  34. private SimpleSegManager.Kcp _kcp;
  35. private bool isUpdate;
  36. public bool isAwaitReConnect { get; set; }
  37. private CTask _cTask;
  38. // private long _connectTime;
  39. public KCPClient()
  40. {
  41. }
  42. public async CTask Connect(IPEndPoint endPoint, long playerId, IClient client)
  43. {
  44. try
  45. {
  46. _cTask = CTask.Create(false);
  47. this.client = client;
  48. this.playerId = playerId;
  49. isUpdate = true;
  50. socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
  51. socket.Connect(endPoint);
  52. _kcp = new SimpleSegManager.Kcp(0, this);
  53. kcp.NoDelay(1, 40, 2, 1);
  54. this.EndPoint = endPoint;
  55. _udpClientThread = new Thread(BeginRecv);
  56. _udpClientThread.Start();
  57. IsConnection = false;
  58. disconnect = false;
  59. // _connectTime = System.DateTime.Now.Ticks;
  60. byte[] data = SocketTool.LongToByte(playerId);
  61. socket.Send(data);
  62. // SendToServer(SendDataType.ShakeHands, data);
  63. _updateThread = new Thread(Update);
  64. _receiveThread = new Thread(ReceiveAsyncThread);
  65. _updateThread.Start();
  66. _receiveThread.Start();
  67. lasetGetHandshakeTime = 0;
  68. await _cTask;
  69. }
  70. catch (Exception e)
  71. {
  72. disconnect = true;
  73. LogTool.Exception(e);
  74. throw;
  75. }
  76. }
  77. public void ReConnect(IPEndPoint endPoint, int gameFrame)
  78. {
  79. // LogTool.Log("重新链接");
  80. // if (socket != null)
  81. // {
  82. // socket.Dispose();
  83. // socket = null;
  84. // }
  85. //
  86. // lasetGetHandshakeTime = 0;
  87. // // _udpClientThread = null;
  88. // Thread.Sleep(10);
  89. // socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
  90. // socket.Connect(endPoint);
  91. // this.EndPoint = endPoint;
  92. // byte[] data = SocketTool.LongToByte(playerId);
  93. // socket.Send(data);
  94. // // SendToServer(SendDataType.ShakeHands, data);
  95. // IsConnection = false;
  96. // disconnect = false;
  97. // isAwaitReConnect = true;
  98. byte[] data = SocketTool.LongToByte(playerId);
  99. byte[] gameFrameByte = SocketTool.IntToByte(gameFrame);
  100. byte[] buffer = new byte[data.Length + gameFrameByte.Length];
  101. Array.Copy(data, 0, buffer, 0, data.Length);
  102. Array.Copy(gameFrameByte, 0, buffer, data.Length, gameFrameByte.Length);
  103. SendToServer(SendDataType.GetGameFrameByte, buffer);
  104. }
  105. public void Output(IMemoryOwner<byte> buffer, int avalidLength)
  106. {
  107. if (EndPoint == null || disconnect)
  108. {
  109. return;
  110. }
  111. var s = buffer.Memory.Span.Slice(0, avalidLength).ToArray();
  112. if (socket == null)
  113. {
  114. return;
  115. }
  116. try
  117. {
  118. socket.Send(s);
  119. }
  120. catch (Exception e)
  121. {
  122. // disconnect = true;
  123. // disconnect = true;
  124. // IsConnection = false;
  125. if (socket != null)
  126. {
  127. socket.Dispose();
  128. socket = null;
  129. }
  130. }
  131. buffer.Dispose();
  132. }
  133. public void SendToServer(SendDataType sendDataType, byte[] buffer)
  134. {
  135. byte[] sendBuff = new byte[buffer.Length + 1];
  136. sendBuff[0] = (byte)sendDataType;
  137. Array.Copy(buffer, 0, sendBuff, 1, buffer.Length);
  138. int c = kcp.Send(sendBuff.AsSpan().Slice(0, sendBuff.Length));
  139. }
  140. public void Finish()
  141. {
  142. }
  143. public byte[] ReceiveAsync()
  144. {
  145. var (buffer, avalidLength) = kcp.TryRecv();
  146. while (buffer == null)
  147. {
  148. Thread.Sleep(5);
  149. if (_udpClientThread == null)
  150. {
  151. return null;
  152. }
  153. (buffer, avalidLength) = kcp.TryRecv();
  154. }
  155. var s = buffer.Memory.Span.Slice(0, avalidLength).ToArray();
  156. return s;
  157. }
  158. private async void BeginRecv()
  159. {
  160. while (isUpdate)
  161. {
  162. try
  163. {
  164. if (socket == null)
  165. {
  166. Thread.Sleep(3);
  167. continue;
  168. }
  169. if (socket.Receive(buffer, SocketFlags.Peek) <= 0)
  170. {
  171. Thread.Sleep(3);
  172. continue;
  173. }
  174. int count = socket.Receive(buffer, SocketFlags.None);
  175. if (count <= 0)
  176. {
  177. Thread.Sleep(3);
  178. continue;
  179. }
  180. try
  181. {
  182. byte[] data = new byte[count];
  183. Array.Copy(buffer, 0, data, 0, data.Length);
  184. int ok = kcp.Input(data);
  185. }
  186. catch (Exception e)
  187. {
  188. LogTool.Exception(e);
  189. }
  190. }
  191. catch (Exception e)
  192. {
  193. LogTool.Log("链接断开");
  194. // socket.Dispose();
  195. socket = null;
  196. disconnect = true;
  197. IsConnection = false;
  198. }
  199. Thread.Sleep(3);
  200. }
  201. LogTool.Log("执行完成kcp_BeginRecv");
  202. }
  203. private void Update()
  204. {
  205. while (isUpdate)
  206. {
  207. try
  208. {
  209. kcp.Update(DateTimeOffset.UtcNow);
  210. if (IsConnection)
  211. {
  212. if (DateTime.Now.Ticks - lasetGetHandshakeTime > 70000000) //断开了链接
  213. {
  214. disconnect = true;
  215. IsConnection = false;
  216. return;
  217. }
  218. }
  219. Thread.Sleep(5);
  220. }
  221. catch (Exception e)
  222. {
  223. LogTool.Error(e);
  224. }
  225. }
  226. LogTool.Log("执行完成kcp_Update");
  227. }
  228. private void ReceiveAsyncThread()
  229. {
  230. while (isUpdate)
  231. {
  232. try
  233. {
  234. var res = ReceiveAsync();
  235. if (res == null)
  236. {
  237. Thread.Sleep(10);
  238. continue;
  239. }
  240. SendDataType sendDataType = (SendDataType)res[0];
  241. byte[] decompressedByte = null;
  242. if (sendDataType == SendDataType.Data)
  243. {
  244. byte[] jmData = new byte[res.Length - 1];
  245. Array.Copy(res, 1, jmData, 0, jmData.Length);
  246. using (MemoryStream compressedStream = new MemoryStream(jmData))
  247. {
  248. using (MemoryStream decompressedStream = new MemoryStream())
  249. {
  250. using (GZipStream gzipStream =
  251. new GZipStream(compressedStream, CompressionMode.Decompress))
  252. {
  253. gzipStream.CopyTo(decompressedStream);
  254. }
  255. decompressedByte = decompressedStream.ToArray();
  256. }
  257. }
  258. }
  259. else
  260. {
  261. decompressedByte = res;
  262. }
  263. switch (sendDataType)
  264. {
  265. case SendDataType.ShakeHands:
  266. byte[] playerData = new byte[8];
  267. playerData[0] = decompressedByte[1];
  268. playerData[1] = decompressedByte[2];
  269. playerData[2] = decompressedByte[3];
  270. playerData[3] = decompressedByte[4];
  271. playerData[4] = decompressedByte[5];
  272. playerData[5] = decompressedByte[6];
  273. playerData[6] = decompressedByte[7];
  274. playerData[7] = decompressedByte[8];
  275. long playerId = SocketTool.ByteToInt(playerData);
  276. if (playerId != this.playerId)
  277. {
  278. LogTool.Error("握手消息不对");
  279. }
  280. IsConnection = true;
  281. isAwaitReConnect = false;
  282. if (lasetGetHandshakeTime == 0)
  283. {
  284. lasetGetHandshakeTime = DateTime.Now.Ticks;
  285. }
  286. _cTask.SetResult();
  287. break;
  288. case SendDataType.Heartbeat:
  289. if (!IsConnection)
  290. {
  291. LogTool.Log("链接成功");
  292. }
  293. isAwaitReConnect = false;
  294. IsConnection = true;
  295. lasetGetHandshakeTime = DateTime.Now.Ticks;
  296. // LogTool.Log("收到心跳包");
  297. break;
  298. case SendDataType.Data:
  299. // CombatSynchronizeResponsePack combatSynchronizeResponse =
  300. // CombatSynchronizeResponsePack.Parser.ParseFrom(decompressedByte);
  301. // if (combatSynchronizeResponse == null)
  302. // {
  303. // LogTool.Error("错误的实例化战斗" + decompressedByte.Length);
  304. // }
  305. // else
  306. // {
  307. // //
  308. // long t = System.DateTime.Now.Ticks;
  309. // // = (int) ((t - lasetTime) / 10000);
  310. // // lasetTime = t;
  311. // if (!IsConnection)
  312. // {
  313. // LogTool.Log("链接成功");
  314. // }
  315. //
  316. // if (lasetGetHandshakeTime == 0)
  317. // {
  318. // lasetGetHandshakeTime = DateTime.Now.Ticks;
  319. // }
  320. //
  321. // isAwaitReConnect = false;
  322. // IsConnection = true;
  323. // // LogTool.Log("收包延迟" + (t - combatSynchronizeResponse.Time) / 10000);
  324. // client.AddCombatSynchronizeResponse(combatSynchronizeResponse);
  325. // }
  326. break;
  327. }
  328. // await Task.Delay(Random.Next(0,200));
  329. }
  330. catch (Exception e)
  331. {
  332. LogTool.Error(e);
  333. }
  334. }
  335. }
  336. public void Dispose()
  337. {
  338. isUpdate = false;
  339. // _udpClientThread?.Abort();
  340. _udpClientThread = null;
  341. socket?.Dispose();
  342. kcp?.Dispose();
  343. socket = null;
  344. _kcp = null;
  345. }
  346. }
  347. }