communication_socket_tcp_base.cs 23 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using System.Net.Sockets;
  7. using System.Threading;
  8. using tool;
  9. namespace chutian_parking_terminal.communication_base
  10. {
  11. public class Communication_socket_tcp_base
  12. {
  13. public enum Communication_socket_tcp_statu
  14. {
  15. COMMUNICATION_TCP_UNKNOW = 0, //通信状态 未知
  16. COMMUNICATION_TCP_READY = 1, //通信状态 正常
  17. COMMUNICATION_TCP_FAULT = 2, //通信状态 错误
  18. };
  19. public Communication_socket_tcp_base()
  20. {
  21. m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_UNKNOW;//通信状态
  22. m_headsize = 4; //包头长度(固定为4)
  23. m_surplusBuffer = null; //缓冲区(存放不完整消息)
  24. m_haveread = 0; //数据包读取下标
  25. m_totalLen = 0; //数据包总长
  26. m_receive_data_list = new List<byte[]>();
  27. m_send_data_list = new List<byte[]>();
  28. m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);//通信socket
  29. m_encapsulate_cycle_time = 100;
  30. //接受线程默认循环
  31. m_receive_data_condition = true;
  32. m_receive_data_thread = new Thread(receive_data_thread);
  33. //解析线程默认循环
  34. m_analysis_data_condition = true;
  35. m_analysis_data_thread = new Thread(analysis_data_thread);
  36. //发送线程默认循环
  37. m_send_data_condition = true;
  38. m_send_data_thread = new Thread(send_data_thread);
  39. //封装线程默认循环
  40. m_encapsulate_data_condition = true;
  41. m_encapsulate_data_thread = new Thread(encapsulate_data_thread);
  42. //重连线程默认循环
  43. m_reconnection_condition = true;
  44. m_reconnection_thread = new Thread(reconnection_thread);
  45. m_event_exit = new EventWaitHandle(false, EventResetMode.ManualReset);
  46. json_file_operation.Instance.json_file_operation_init("../../settings.json");
  47. }
  48. //通信初始化
  49. public void communication_socket_tcp_init()
  50. {
  51. //获取单片机IP和端口
  52. this.m_ip = json_file_operation.Instance.read_json_string("singleChipAddress");
  53. this.m_port = int.Parse(json_file_operation.Instance.read_json_string("singleChipPort"));
  54. }
  55. public void communication_socket_tcp_connect()
  56. {
  57. try
  58. {
  59. m_socket.Connect(m_ip, m_port);
  60. m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_READY;
  61. }
  62. catch (Exception ex)
  63. {
  64. m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT;
  65. }
  66. //启动通信
  67. communication_socket_tcp_run();
  68. }
  69. //反初始化
  70. public void communication_socket_tcp_uninit()
  71. {
  72. m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_UNKNOW;
  73. //关闭5个线程
  74. m_receive_data_condition = false;
  75. m_analysis_data_condition = false;
  76. m_send_data_condition = false;
  77. m_encapsulate_data_condition = false;
  78. m_reconnection_condition = false;
  79. //回收5个线程的资源
  80. if (m_receive_data_thread != null)
  81. {
  82. m_receive_data_thread.Abort();
  83. }
  84. if (m_analysis_data_thread != null)
  85. {
  86. m_analysis_data_thread.Abort();
  87. }
  88. if (m_send_data_thread != null)
  89. {
  90. m_send_data_thread.Abort();
  91. }
  92. if (m_encapsulate_data_thread != null)
  93. {
  94. m_encapsulate_data_thread.Abort();
  95. }
  96. if (m_reconnection_thread != null)
  97. {
  98. m_reconnection_thread.Abort();
  99. }
  100. m_socket?.Close();
  101. //清空list
  102. lock (m_receive_list_lock)
  103. {
  104. m_receive_data_list.Clear();
  105. }
  106. lock (m_send_list_lock)
  107. {
  108. m_send_data_list.Clear();
  109. }
  110. }
  111. public void communication_socket_tcp_run()
  112. {
  113. //启动5个线程
  114. m_reconnection_thread.Start();
  115. m_receive_data_thread.Start();
  116. m_send_data_thread.Start();
  117. m_analysis_data_thread.Start();
  118. m_encapsulate_data_thread.Start();
  119. }
  120. public void receive_data_thread()
  121. {
  122. while (m_receive_data_condition && !m_event_exit.WaitOne(1))
  123. {
  124. if (m_receive_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
  125. {
  126. byte[] buffer = new byte[2048];
  127. int length = 0;
  128. //接收消息
  129. try
  130. {
  131. length = m_socket.Receive(buffer);
  132. }
  133. catch (Exception ex)
  134. {
  135. //m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT;
  136. //Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "单片机连接中断" + ex.Message);
  137. }
  138. if (length > 0)//长度大于0属于有效数据
  139. {
  140. //有效消息就放入链表
  141. lock(m_receive_list_lock)
  142. {
  143. m_receive_data_list.Add(buffer);
  144. }
  145. }
  146. }
  147. }
  148. }
  149. public void analysis_data_thread()
  150. {
  151. //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
  152. while (m_analysis_data_condition && !m_event_exit.WaitOne(1))
  153. {
  154. if (m_analysis_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
  155. {
  156. lock (m_receive_list_lock)
  157. {
  158. //处理链表信息
  159. if (m_receive_data_list.Count != 0)
  160. {
  161. for (int i=0;i<m_receive_data_list.Count;)
  162. {
  163. if (m_receive_data_list[i] == null)
  164. {
  165. //删除等于空的节点
  166. m_receive_data_list.Remove(m_receive_data_list[i]);
  167. }
  168. else
  169. {
  170. //消除buffer中多余的0
  171. string jsonStr = Encoding.Default.GetString(m_receive_data_list[i]);
  172. jsonStr = jsonStr.Trim('\0');
  173. byte[] t_buffer = Encoding.Default.GetBytes(jsonStr);
  174. //解决粘包问题
  175. string msg = OnReceive(t_buffer);
  176. //如果返回的是空,说明发生了粘包问题 当此读取没有读取到完整数据包
  177. if (msg != null)
  178. {
  179. //如果没有返回null说明是处理过后返回的完整数据包 可用于处理
  180. //处理消息
  181. execute_msg(msg);
  182. //处理完成后删除消息
  183. m_receive_data_list.Remove(m_receive_data_list[i]);
  184. continue;
  185. }
  186. }
  187. ++i;
  188. }
  189. }
  190. }
  191. }
  192. }
  193. }
  194. public void send_data_thread()
  195. {
  196. //通信发送线程, 负责巡检m_send_data_list, 并发送消息
  197. while (m_send_data_condition && !m_event_exit.WaitOne(1))
  198. {
  199. if (m_send_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
  200. {
  201. lock (m_send_list_lock)
  202. {
  203. if (m_send_data_list.Count != 0)
  204. {
  205. for (int i = 0; i < m_send_data_list.Count;)
  206. {
  207. if (m_send_data_list[i] != null)
  208. {
  209. try
  210. {
  211. m_socket.Send(m_send_data_list[i]);
  212. }
  213. catch (Exception ex)
  214. {
  215. }
  216. m_send_data_list.Remove(m_send_data_list[i]);
  217. continue;
  218. }
  219. ++i;
  220. }
  221. }
  222. }
  223. }
  224. }
  225. }
  226. public void encapsulate_data_thread()
  227. {
  228. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  229. while (m_encapsulate_data_condition && !m_event_exit.WaitOne(m_encapsulate_cycle_time))
  230. {
  231. if (m_encapsulate_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
  232. {
  233. encapsulate_send_data();
  234. }
  235. }
  236. }
  237. public void reconnection_thread()
  238. {
  239. //如果线程存活
  240. while (m_reconnection_condition && !m_event_exit.WaitOne(1))
  241. {
  242. //如果处于未连接状态需要进行连接
  243. if (!IsSocketConnected())
  244. {
  245. try
  246. {
  247. //重新分配socket
  248. m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  249. m_socket.Connect(m_ip, m_port);
  250. Console.WriteLine("连接成功");
  251. m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_READY;
  252. }
  253. catch (Exception ex)
  254. {
  255. Console.WriteLine("连接失败");
  256. m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT;
  257. Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "单片机未启动,或者初始设置的单片机连接端口不对" + ex.Message);
  258. }
  259. }
  260. }
  261. }
  262. //处理消息, 需要子类重载
  263. public virtual void execute_msg(string message)
  264. {
  265. //处理消息必须子类重载,本模块只负责通信
  266. }
  267. //封装消息, 需要子类重载
  268. public virtual void encapsulate_msg(string message)
  269. {
  270. byte[] t_byte = System.Text.Encoding.Unicode.GetBytes(message);
  271. lock (m_send_list_lock)
  272. {
  273. m_send_data_list.Add(t_byte);
  274. }
  275. }
  276. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  277. public virtual void encapsulate_send_data()
  278. {
  279. }
  280. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  281. public virtual bool check_statu()
  282. {
  283. if (m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
  284. {
  285. return true;
  286. }
  287. else
  288. {
  289. return false;
  290. }
  291. }
  292. //处理粘包问题
  293. private string OnReceive(byte[] receivedBytes)
  294. {
  295. //解决粘包问题
  296. try
  297. {
  298. //bytes 为系统缓冲区数据
  299. //bytesRead为系统缓冲区长度
  300. int bytesRead = receivedBytes.Length;
  301. if (bytesRead > 0)
  302. {
  303. //判断是不是第一次接收,为空则说明是第一次
  304. if (m_surplusBuffer == null)
  305. //把系统缓冲区数据放在自定义缓冲区里面
  306. m_surplusBuffer = receivedBytes;
  307. else
  308. //拼接上一次剩余的包
  309. m_surplusBuffer = m_surplusBuffer.Concat(receivedBytes).ToArray();
  310. //已经完成读取每个数据包长度
  311. m_haveread = 0;
  312. //这里m_totalLen的长度有可能大于缓冲区大小的(因为 这里的m_surplusBuffer 是系统缓冲区+不完整的数据包)
  313. m_totalLen = m_surplusBuffer.Length;
  314. while (m_haveread <= m_totalLen)
  315. {
  316. //如果在N此拆解后剩余的数据包连一个包头的长度都不够
  317. //说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包
  318. if (m_totalLen - m_haveread < m_headsize)
  319. {
  320. //定义一个数组长度为剩余字节长度的字节数组
  321. byte[] byteSub = new byte[m_totalLen - m_haveread];
  322. //把剩下不够一个完整的数据包存到byteSub
  323. Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
  324. //把剩下不够一个完整的数据包存到m_surplusBuffer
  325. m_surplusBuffer = byteSub;
  326. m_totalLen = 0;
  327. return null;
  328. }
  329. //剩余的字节数>=包头数据
  330. else
  331. {
  332. //如果够了一个完整包头的长度,则读取包头的数据
  333. byte[] headByte = new byte[m_headsize - 1];
  334. //从缓冲区里读取包头的字节
  335. Buffer.BlockCopy(m_surplusBuffer, m_haveread + 1, headByte, 0, m_headsize - 1);
  336. //从包头里面分析出包体的长度
  337. int bodySize = Convert.ToInt32(Encoding.Default.GetString(headByte));
  338. //这里的 m_haveread=等于N个数据包的长度 从0开始;0,1,2,3....N
  339. //如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存
  340. if (m_haveread + m_headsize + bodySize > m_totalLen)
  341. {
  342. byte[] byteSub = new byte[m_totalLen - m_haveread];
  343. Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
  344. m_surplusBuffer = byteSub;
  345. return null;
  346. }
  347. else
  348. {
  349. //挨个分解每个包,解析成实际文字
  350. String json = Encoding.UTF8.GetString(m_surplusBuffer, m_haveread + m_headsize, bodySize);
  351. //处理解读出来的消息,当消息不同于原先的值时,才对消息进行处理
  352. //if (sensorMsg == null || !sensorMsg.Equals(new JsonParser(JsonParser.Settings.Default).Parse<SingleChip.Sensor_state_msg>(json)))
  353. //{
  354. // addressReceivedMessage(json);
  355. //}
  356. // addressReceivedMessage(json);
  357. //AddMsg(string.Format(" > [OnReceive] -> {0}", strc));
  358. //依次累加当前的数据包的长度
  359. m_haveread = m_haveread + m_headsize + bodySize;
  360. //如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0
  361. if (m_headsize + bodySize == bytesRead)
  362. {
  363. //设置空 回到原始状态
  364. m_surplusBuffer = null;
  365. //清0
  366. m_totalLen = 0;
  367. }
  368. return json;
  369. }
  370. }
  371. }
  372. }
  373. return null;
  374. }
  375. //当因为收到错误数据而抛出异常时,丢弃当前的错误包
  376. //解决丢包问题(上面的try代码只能读取格式正确的粘包数据)
  377. //对于包头正常,但不够一个包的丢包数据包不会当时抛出异常,会等到下一波接收数据,集齐了一个完整包再抛出异常
  378. catch (Exception e)
  379. {
  380. int n = m_haveread;
  381. string jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread + m_headsize, m_totalLen - m_haveread - m_headsize);
  382. //至少有一个完整包的数据包丢包
  383. if (jsonTest.Contains('@'))
  384. {
  385. int i = 0;
  386. for (i = m_haveread + m_headsize; i < m_totalLen; i++)
  387. {
  388. //没有丢包,i - m_headsize-1为实测的包体长度
  389. if (m_surplusBuffer[i] == '@' && m_surplusBuffer[i - 1] == '}')
  390. {
  391. jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread);
  392. Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest);
  393. //丢掉该部分
  394. m_haveread = i;
  395. jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread);
  396. Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest);
  397. byte[] byteSub = new byte[m_totalLen - m_haveread];
  398. Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
  399. m_surplusBuffer = byteSub;
  400. //捕捉到错误数据之后,要等待下一次接收到数据才能继续读取错误数据包之后的包
  401. Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "收到错误数据");
  402. break;
  403. }
  404. else
  405. {
  406. continue;
  407. }
  408. }
  409. return null;
  410. }
  411. //最后一批(完整或不完整)数据的头丢包
  412. else
  413. {
  414. jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread);
  415. Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest + e.Message);
  416. byte[] byteSub = new byte[m_totalLen - m_haveread];
  417. Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
  418. m_surplusBuffer = byteSub;
  419. return null;
  420. }
  421. }
  422. }
  423. //检查socket是否连接中
  424. public bool IsSocketConnected()
  425. {
  426. bool blockingState = m_socket.Blocking;
  427. try
  428. {
  429. byte[] tmp = new byte[1];
  430. m_socket.Blocking = false;
  431. m_socket.Send(tmp, 0, 0);
  432. return true;
  433. }
  434. catch (SocketException e)
  435. {
  436. // 产生 10035 == WSAEWOULDBLOCK 错误,说明被阻止了,但是还是连接的
  437. if (e.NativeErrorCode.Equals(10035))
  438. return true;
  439. else
  440. return false;
  441. }
  442. finally
  443. {
  444. m_socket.Blocking = blockingState; // 恢复状态
  445. }
  446. }
  447. //通信模块
  448. protected Socket m_socket; //网络编程接口
  449. protected string m_ip; //IP
  450. protected int m_port; //端口
  451. protected Communication_socket_tcp_statu m_communication_status; //通信状态
  452. //数据锁
  453. //作用:将会锁住代码块的内容,并阻止其他线程进入该代码块,直到该代码块运行完成,释放该锁。
  454. //注意:定义的锁对象应该是 私有的,静态的,只读的,引用类型的对象,这样可以防止外部改变锁对象
  455. private static readonly object m_receive_list_lock = new object();
  456. private static readonly object m_send_list_lock = new object();
  457. private static readonly object m_communication_lock = new object();
  458. protected Thread m_reconnection_thread; //重连线程
  459. protected bool m_reconnection_condition; //重连线程条件变量
  460. protected Thread m_analysis_data_thread; //解析线程
  461. protected bool m_analysis_data_condition; //解析线程条件变量
  462. protected Thread m_receive_data_thread; //接收线程
  463. protected bool m_receive_data_condition; //接收线程条件变量
  464. protected Thread m_send_data_thread; //发送线程
  465. protected bool m_send_data_condition; //发送线程条件变量
  466. protected Thread m_encapsulate_data_thread; //封装的线程
  467. protected bool m_encapsulate_data_condition; //接受的条件变量
  468. protected int m_encapsulate_cycle_time; //自动封装时间
  469. protected List<byte[]> m_receive_data_list; //接收list 存放接收的消息
  470. protected List<byte[]> m_send_data_list; //发送list 发送存放的消息
  471. protected EventWaitHandle m_event_exit; //防止CPU卡死
  472. //数据解析模块
  473. protected byte[] m_surplusBuffer; //不完整的数据包,即用户自定义缓冲区
  474. protected int m_headsize; //包头长度 固定4
  475. protected int m_haveread; //数据包读取位置
  476. protected int m_totalLen; //数据包总长
  477. }
  478. }