using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Net.Sockets; using System.Threading; using tool; namespace chutian_parking_terminal.communication_base { public class Communication_socket_tcp_base { public enum Communication_socket_tcp_statu { COMMUNICATION_TCP_UNKNOW = 0, //通信状态 未知 COMMUNICATION_TCP_READY = 1, //通信状态 正常 COMMUNICATION_TCP_FAULT = 2, //通信状态 错误 }; public Communication_socket_tcp_base() { m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_UNKNOW;//通信状态 m_headsize = 4; //包头长度(固定为4) m_surplusBuffer = null; //缓冲区(存放不完整消息) m_haveread = 0; //数据包读取下标 m_totalLen = 0; //数据包总长 m_receive_data_list = new List(); m_send_data_list = new List(); m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);//通信socket m_encapsulate_cycle_time = 100; //接受线程默认循环 m_receive_data_condition = true; m_receive_data_thread = new Thread(receive_data_thread); //解析线程默认循环 m_analysis_data_condition = true; m_analysis_data_thread = new Thread(analysis_data_thread); //发送线程默认循环 m_send_data_condition = true; m_send_data_thread = new Thread(send_data_thread); //封装线程默认循环 m_encapsulate_data_condition = true; m_encapsulate_data_thread = new Thread(encapsulate_data_thread); //重连线程默认循环 m_reconnection_condition = true; m_reconnection_thread = new Thread(reconnection_thread); m_event_exit = new EventWaitHandle(false, EventResetMode.ManualReset); json_file_operation.Instance.json_file_operation_init("../../settings.json"); } //通信初始化 public void communication_socket_tcp_init() { //获取单片机IP和端口 this.m_ip = json_file_operation.Instance.read_json_string("singleChipAddress"); this.m_port = int.Parse(json_file_operation.Instance.read_json_string("singleChipPort")); } public void communication_socket_tcp_connect() { try { m_socket.Connect(m_ip, m_port); m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_READY; } catch (Exception ex) { m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT; } //启动通信 communication_socket_tcp_run(); } //反初始化 public void communication_socket_tcp_uninit() { m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_UNKNOW; //关闭5个线程 m_receive_data_condition = false; m_analysis_data_condition = false; m_send_data_condition = false; m_encapsulate_data_condition = false; m_reconnection_condition = false; //回收5个线程的资源 if (m_receive_data_thread != null) { m_receive_data_thread.Abort(); } if (m_analysis_data_thread != null) { m_analysis_data_thread.Abort(); } if (m_send_data_thread != null) { m_send_data_thread.Abort(); } if (m_encapsulate_data_thread != null) { m_encapsulate_data_thread.Abort(); } if (m_reconnection_thread != null) { m_reconnection_thread.Abort(); } m_socket?.Close(); //清空list lock (m_receive_list_lock) { m_receive_data_list.Clear(); } lock (m_send_list_lock) { m_send_data_list.Clear(); } } public void communication_socket_tcp_run() { //启动5个线程 m_reconnection_thread.Start(); m_receive_data_thread.Start(); m_send_data_thread.Start(); m_analysis_data_thread.Start(); m_encapsulate_data_thread.Start(); } public void receive_data_thread() { while (m_receive_data_condition && !m_event_exit.WaitOne(1)) { if (m_receive_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY) { byte[] buffer = new byte[2048]; int length = 0; //接收消息 try { length = m_socket.Receive(buffer); } catch (Exception ex) { //m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT; //Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "单片机连接中断" + ex.Message); } if (length > 0)//长度大于0属于有效数据 { //有效消息就放入链表 lock(m_receive_list_lock) { m_receive_data_list.Add(buffer); } } } } } public void analysis_data_thread() { //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息 while (m_analysis_data_condition && !m_event_exit.WaitOne(1)) { if (m_analysis_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY) { lock (m_receive_list_lock) { //处理链表信息 if (m_receive_data_list.Count != 0) { for (int i=0;i 0) { //判断是不是第一次接收,为空则说明是第一次 if (m_surplusBuffer == null) //把系统缓冲区数据放在自定义缓冲区里面 m_surplusBuffer = receivedBytes; else //拼接上一次剩余的包 m_surplusBuffer = m_surplusBuffer.Concat(receivedBytes).ToArray(); //已经完成读取每个数据包长度 m_haveread = 0; //这里m_totalLen的长度有可能大于缓冲区大小的(因为 这里的m_surplusBuffer 是系统缓冲区+不完整的数据包) m_totalLen = m_surplusBuffer.Length; while (m_haveread <= m_totalLen) { //如果在N此拆解后剩余的数据包连一个包头的长度都不够 //说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包 if (m_totalLen - m_haveread < m_headsize) { //定义一个数组长度为剩余字节长度的字节数组 byte[] byteSub = new byte[m_totalLen - m_haveread]; //把剩下不够一个完整的数据包存到byteSub Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread); //把剩下不够一个完整的数据包存到m_surplusBuffer m_surplusBuffer = byteSub; m_totalLen = 0; return null; } //剩余的字节数>=包头数据 else { //如果够了一个完整包头的长度,则读取包头的数据 byte[] headByte = new byte[m_headsize - 1]; //从缓冲区里读取包头的字节 Buffer.BlockCopy(m_surplusBuffer, m_haveread + 1, headByte, 0, m_headsize - 1); //从包头里面分析出包体的长度 int bodySize = Convert.ToInt32(Encoding.Default.GetString(headByte)); //这里的 m_haveread=等于N个数据包的长度 从0开始;0,1,2,3....N //如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存 if (m_haveread + m_headsize + bodySize > m_totalLen) { byte[] byteSub = new byte[m_totalLen - m_haveread]; Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread); m_surplusBuffer = byteSub; return null; } else { //挨个分解每个包,解析成实际文字 String json = Encoding.UTF8.GetString(m_surplusBuffer, m_haveread + m_headsize, bodySize); //处理解读出来的消息,当消息不同于原先的值时,才对消息进行处理 //if (sensorMsg == null || !sensorMsg.Equals(new JsonParser(JsonParser.Settings.Default).Parse(json))) //{ // addressReceivedMessage(json); //} // addressReceivedMessage(json); //AddMsg(string.Format(" > [OnReceive] -> {0}", strc)); //依次累加当前的数据包的长度 m_haveread = m_haveread + m_headsize + bodySize; //如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0 if (m_headsize + bodySize == bytesRead) { //设置空 回到原始状态 m_surplusBuffer = null; //清0 m_totalLen = 0; } return json; } } } } return null; } //当因为收到错误数据而抛出异常时,丢弃当前的错误包 //解决丢包问题(上面的try代码只能读取格式正确的粘包数据) //对于包头正常,但不够一个包的丢包数据包不会当时抛出异常,会等到下一波接收数据,集齐了一个完整包再抛出异常 catch (Exception e) { int n = m_haveread; string jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread + m_headsize, m_totalLen - m_haveread - m_headsize); //至少有一个完整包的数据包丢包 if (jsonTest.Contains('@')) { int i = 0; for (i = m_haveread + m_headsize; i < m_totalLen; i++) { //没有丢包,i - m_headsize-1为实测的包体长度 if (m_surplusBuffer[i] == '@' && m_surplusBuffer[i - 1] == '}') { jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread); Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest); //丢掉该部分 m_haveread = i; jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread); Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest); byte[] byteSub = new byte[m_totalLen - m_haveread]; Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread); m_surplusBuffer = byteSub; //捕捉到错误数据之后,要等待下一次接收到数据才能继续读取错误数据包之后的包 Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "收到错误数据"); break; } else { continue; } } return null; } //最后一批(完整或不完整)数据的头丢包 else { jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread); Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest + e.Message); byte[] byteSub = new byte[m_totalLen - m_haveread]; Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread); m_surplusBuffer = byteSub; return null; } } } //检查socket是否连接中 public bool IsSocketConnected() { bool blockingState = m_socket.Blocking; try { byte[] tmp = new byte[1]; m_socket.Blocking = false; m_socket.Send(tmp, 0, 0); return true; } catch (SocketException e) { // 产生 10035 == WSAEWOULDBLOCK 错误,说明被阻止了,但是还是连接的 if (e.NativeErrorCode.Equals(10035)) return true; else return false; } finally { m_socket.Blocking = blockingState; // 恢复状态 } } //通信模块 protected Socket m_socket; //网络编程接口 protected string m_ip; //IP protected int m_port; //端口 protected Communication_socket_tcp_statu m_communication_status; //通信状态 //数据锁 //作用:将会锁住代码块的内容,并阻止其他线程进入该代码块,直到该代码块运行完成,释放该锁。 //注意:定义的锁对象应该是 私有的,静态的,只读的,引用类型的对象,这样可以防止外部改变锁对象 private static readonly object m_receive_list_lock = new object(); private static readonly object m_send_list_lock = new object(); private static readonly object m_communication_lock = new object(); protected Thread m_reconnection_thread; //重连线程 protected bool m_reconnection_condition; //重连线程条件变量 protected Thread m_analysis_data_thread; //解析线程 protected bool m_analysis_data_condition; //解析线程条件变量 protected Thread m_receive_data_thread; //接收线程 protected bool m_receive_data_condition; //接收线程条件变量 protected Thread m_send_data_thread; //发送线程 protected bool m_send_data_condition; //发送线程条件变量 protected Thread m_encapsulate_data_thread; //封装的线程 protected bool m_encapsulate_data_condition; //接受的条件变量 protected int m_encapsulate_cycle_time; //自动封装时间 protected List m_receive_data_list; //接收list 存放接收的消息 protected List m_send_data_list; //发送list 发送存放的消息 protected EventWaitHandle m_event_exit; //防止CPU卡死 //数据解析模块 protected byte[] m_surplusBuffer; //不完整的数据包,即用户自定义缓冲区 protected int m_headsize; //包头长度 固定4 protected int m_haveread; //数据包读取位置 protected int m_totalLen; //数据包总长 } }