123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using System.Net.Sockets;
- using System.Threading;
- using tool;
- namespace chutian_parking_terminal.communication_base
- {
- public class Communication_socket_tcp_base
- {
- public enum Communication_socket_tcp_statu
- {
- COMMUNICATION_TCP_UNKNOW = 0, //通信状态 未知
- COMMUNICATION_TCP_READY = 1, //通信状态 正常
- COMMUNICATION_TCP_FAULT = 2, //通信状态 错误
- };
- public Communication_socket_tcp_base()
- {
- m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_UNKNOW;//通信状态
- m_headsize = 4; //包头长度(固定为4)
- m_surplusBuffer = null; //缓冲区(存放不完整消息)
- m_haveread = 0; //数据包读取下标
- m_totalLen = 0; //数据包总长
- m_receive_data_list = new List<byte[]>();
- m_send_data_list = new List<byte[]>();
-
- m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);//通信socket
- m_encapsulate_cycle_time = 100;
- //接受线程默认循环
- m_receive_data_condition = true;
- m_receive_data_thread = new Thread(receive_data_thread);
- //解析线程默认循环
- m_analysis_data_condition = true;
- m_analysis_data_thread = new Thread(analysis_data_thread);
- //发送线程默认循环
- m_send_data_condition = true;
- m_send_data_thread = new Thread(send_data_thread);
- //封装线程默认循环
- m_encapsulate_data_condition = true;
- m_encapsulate_data_thread = new Thread(encapsulate_data_thread);
- //重连线程默认循环
- m_reconnection_condition = true;
- m_reconnection_thread = new Thread(reconnection_thread);
- m_event_exit = new EventWaitHandle(false, EventResetMode.ManualReset);
- json_file_operation.Instance.json_file_operation_init("../../settings.json");
- }
- //通信初始化
- public void communication_socket_tcp_init()
- {
- //获取单片机IP和端口
- this.m_ip = json_file_operation.Instance.read_json_string("singleChipAddress");
- this.m_port = int.Parse(json_file_operation.Instance.read_json_string("singleChipPort"));
- }
- public void communication_socket_tcp_connect()
- {
- try
- {
- m_socket.Connect(m_ip, m_port);
- m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_READY;
- }
- catch (Exception ex)
- {
- m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT;
- }
- //启动通信
- communication_socket_tcp_run();
- }
- //反初始化
- public void communication_socket_tcp_uninit()
- {
- m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_UNKNOW;
- //关闭5个线程
- m_receive_data_condition = false;
- m_analysis_data_condition = false;
- m_send_data_condition = false;
- m_encapsulate_data_condition = false;
- m_reconnection_condition = false;
- //回收5个线程的资源
- if (m_receive_data_thread != null)
- {
- m_receive_data_thread.Abort();
- }
- if (m_analysis_data_thread != null)
- {
- m_analysis_data_thread.Abort();
- }
- if (m_send_data_thread != null)
- {
- m_send_data_thread.Abort();
- }
- if (m_encapsulate_data_thread != null)
- {
- m_encapsulate_data_thread.Abort();
- }
- if (m_reconnection_thread != null)
- {
- m_reconnection_thread.Abort();
- }
- m_socket?.Close();
- //清空list
- lock (m_receive_list_lock)
- {
- m_receive_data_list.Clear();
- }
- lock (m_send_list_lock)
- {
- m_send_data_list.Clear();
- }
- }
- public void communication_socket_tcp_run()
- {
- //启动5个线程
- m_reconnection_thread.Start();
- m_receive_data_thread.Start();
- m_send_data_thread.Start();
- m_analysis_data_thread.Start();
- m_encapsulate_data_thread.Start();
- }
- public void receive_data_thread()
- {
- while (m_receive_data_condition && !m_event_exit.WaitOne(1))
- {
- if (m_receive_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
- {
- byte[] buffer = new byte[2048];
- int length = 0;
- //接收消息
- try
- {
- length = m_socket.Receive(buffer);
- }
- catch (Exception ex)
- {
- //m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT;
- //Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "单片机连接中断" + ex.Message);
- }
- if (length > 0)//长度大于0属于有效数据
- {
- //有效消息就放入链表
- lock(m_receive_list_lock)
- {
- m_receive_data_list.Add(buffer);
- }
-
- }
- }
- }
- }
- public void analysis_data_thread()
- {
- //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
- while (m_analysis_data_condition && !m_event_exit.WaitOne(1))
- {
- if (m_analysis_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
- {
- lock (m_receive_list_lock)
- {
- //处理链表信息
- if (m_receive_data_list.Count != 0)
- {
- for (int i=0;i<m_receive_data_list.Count;)
- {
- if (m_receive_data_list[i] == null)
- {
- //删除等于空的节点
- m_receive_data_list.Remove(m_receive_data_list[i]);
- }
- else
- {
- //消除buffer中多余的0
- string jsonStr = Encoding.Default.GetString(m_receive_data_list[i]);
- jsonStr = jsonStr.Trim('\0');
- byte[] t_buffer = Encoding.Default.GetBytes(jsonStr);
- //解决粘包问题
- string msg = OnReceive(t_buffer);
- //如果返回的是空,说明发生了粘包问题 当此读取没有读取到完整数据包
- if (msg != null)
- {
- //如果没有返回null说明是处理过后返回的完整数据包 可用于处理
- //处理消息
- execute_msg(msg);
- //处理完成后删除消息
- m_receive_data_list.Remove(m_receive_data_list[i]);
- continue;
- }
- }
- ++i;
- }
- }
- }
- }
- }
-
- }
- public void send_data_thread()
- {
- //通信发送线程, 负责巡检m_send_data_list, 并发送消息
- while (m_send_data_condition && !m_event_exit.WaitOne(1))
- {
- if (m_send_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
- {
- lock (m_send_list_lock)
- {
- if (m_send_data_list.Count != 0)
- {
- for (int i = 0; i < m_send_data_list.Count;)
- {
- if (m_send_data_list[i] != null)
- {
- try
- {
- m_socket.Send(m_send_data_list[i]);
- }
- catch (Exception ex)
- {
- }
- m_send_data_list.Remove(m_send_data_list[i]);
- continue;
- }
- ++i;
- }
- }
- }
-
- }
- }
- }
- public void encapsulate_data_thread()
- {
- //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
- while (m_encapsulate_data_condition && !m_event_exit.WaitOne(m_encapsulate_cycle_time))
- {
- if (m_encapsulate_data_condition && m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
- {
- encapsulate_send_data();
- }
- }
- }
- public void reconnection_thread()
- {
-
- //如果线程存活
- while (m_reconnection_condition && !m_event_exit.WaitOne(1))
- {
- //如果处于未连接状态需要进行连接
- if (!IsSocketConnected())
- {
- try
- {
- //重新分配socket
- m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- m_socket.Connect(m_ip, m_port);
- Console.WriteLine("连接成功");
- m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_READY;
- }
- catch (Exception ex)
- {
- Console.WriteLine("连接失败");
- m_communication_status = Communication_socket_tcp_statu.COMMUNICATION_TCP_FAULT;
- Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "单片机未启动,或者初始设置的单片机连接端口不对" + ex.Message);
- }
- }
- }
- }
- //处理消息, 需要子类重载
- public virtual void execute_msg(string message)
- {
- //处理消息必须子类重载,本模块只负责通信
- }
- //封装消息, 需要子类重载
- public virtual void encapsulate_msg(string message)
- {
- byte[] t_byte = System.Text.Encoding.Unicode.GetBytes(message);
- lock (m_send_list_lock)
- {
- m_send_data_list.Add(t_byte);
- }
-
- }
- //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
- public virtual void encapsulate_send_data()
- {
- }
- //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
- public virtual bool check_statu()
- {
- if (m_communication_status == Communication_socket_tcp_statu.COMMUNICATION_TCP_READY)
- {
- return true;
- }
- else
- {
- return false;
- }
- }
- //处理粘包问题
- private string OnReceive(byte[] receivedBytes)
- {
- //解决粘包问题
- try
- {
- //bytes 为系统缓冲区数据
- //bytesRead为系统缓冲区长度
- int bytesRead = receivedBytes.Length;
- if (bytesRead > 0)
- {
- //判断是不是第一次接收,为空则说明是第一次
- if (m_surplusBuffer == null)
- //把系统缓冲区数据放在自定义缓冲区里面
- m_surplusBuffer = receivedBytes;
- else
- //拼接上一次剩余的包
- m_surplusBuffer = m_surplusBuffer.Concat(receivedBytes).ToArray();
- //已经完成读取每个数据包长度
- m_haveread = 0;
- //这里m_totalLen的长度有可能大于缓冲区大小的(因为 这里的m_surplusBuffer 是系统缓冲区+不完整的数据包)
- m_totalLen = m_surplusBuffer.Length;
- while (m_haveread <= m_totalLen)
- {
- //如果在N此拆解后剩余的数据包连一个包头的长度都不够
- //说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包
- if (m_totalLen - m_haveread < m_headsize)
- {
- //定义一个数组长度为剩余字节长度的字节数组
- byte[] byteSub = new byte[m_totalLen - m_haveread];
- //把剩下不够一个完整的数据包存到byteSub
- Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
- //把剩下不够一个完整的数据包存到m_surplusBuffer
- m_surplusBuffer = byteSub;
- m_totalLen = 0;
- return null;
- }
- //剩余的字节数>=包头数据
- else
- {
- //如果够了一个完整包头的长度,则读取包头的数据
- byte[] headByte = new byte[m_headsize - 1];
- //从缓冲区里读取包头的字节
- Buffer.BlockCopy(m_surplusBuffer, m_haveread + 1, headByte, 0, m_headsize - 1);
- //从包头里面分析出包体的长度
- int bodySize = Convert.ToInt32(Encoding.Default.GetString(headByte));
- //这里的 m_haveread=等于N个数据包的长度 从0开始;0,1,2,3....N
- //如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存
- if (m_haveread + m_headsize + bodySize > m_totalLen)
- {
- byte[] byteSub = new byte[m_totalLen - m_haveread];
- Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
- m_surplusBuffer = byteSub;
- return null;
- }
- else
- {
- //挨个分解每个包,解析成实际文字
- String json = Encoding.UTF8.GetString(m_surplusBuffer, m_haveread + m_headsize, bodySize);
- //处理解读出来的消息,当消息不同于原先的值时,才对消息进行处理
- //if (sensorMsg == null || !sensorMsg.Equals(new JsonParser(JsonParser.Settings.Default).Parse<SingleChip.Sensor_state_msg>(json)))
- //{
- // addressReceivedMessage(json);
- //}
- // addressReceivedMessage(json);
- //AddMsg(string.Format(" > [OnReceive] -> {0}", strc));
- //依次累加当前的数据包的长度
- m_haveread = m_haveread + m_headsize + bodySize;
- //如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0
- if (m_headsize + bodySize == bytesRead)
- {
- //设置空 回到原始状态
- m_surplusBuffer = null;
- //清0
- m_totalLen = 0;
- }
- return json;
- }
- }
- }
- }
- return null;
- }
- //当因为收到错误数据而抛出异常时,丢弃当前的错误包
- //解决丢包问题(上面的try代码只能读取格式正确的粘包数据)
- //对于包头正常,但不够一个包的丢包数据包不会当时抛出异常,会等到下一波接收数据,集齐了一个完整包再抛出异常
- catch (Exception e)
- {
- int n = m_haveread;
- string jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread + m_headsize, m_totalLen - m_haveread - m_headsize);
- //至少有一个完整包的数据包丢包
- if (jsonTest.Contains('@'))
- {
- int i = 0;
- for (i = m_haveread + m_headsize; i < m_totalLen; i++)
- {
- //没有丢包,i - m_headsize-1为实测的包体长度
- if (m_surplusBuffer[i] == '@' && m_surplusBuffer[i - 1] == '}')
- {
- jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread);
- Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest);
- //丢掉该部分
- m_haveread = i;
- jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread);
- Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest);
- byte[] byteSub = new byte[m_totalLen - m_haveread];
- Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
- m_surplusBuffer = byteSub;
- //捕捉到错误数据之后,要等待下一次接收到数据才能继续读取错误数据包之后的包
- Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, "收到错误数据");
- break;
- }
- else
- {
- continue;
- }
- }
- return null;
- }
- //最后一批(完整或不完整)数据的头丢包
- else
- {
- jsonTest = Encoding.Default.GetString(m_surplusBuffer, m_haveread, m_totalLen - m_haveread);
- Log.Instance.WriteLog(LogType.PROCESS, LogFile.LOG, jsonTest + e.Message);
- byte[] byteSub = new byte[m_totalLen - m_haveread];
- Buffer.BlockCopy(m_surplusBuffer, m_haveread, byteSub, 0, m_totalLen - m_haveread);
- m_surplusBuffer = byteSub;
- return null;
- }
- }
- }
- //检查socket是否连接中
- public bool IsSocketConnected()
- {
- bool blockingState = m_socket.Blocking;
- try
- {
- byte[] tmp = new byte[1];
- m_socket.Blocking = false;
- m_socket.Send(tmp, 0, 0);
- return true;
- }
- catch (SocketException e)
- {
- // 产生 10035 == WSAEWOULDBLOCK 错误,说明被阻止了,但是还是连接的
- if (e.NativeErrorCode.Equals(10035))
- return true;
- else
- return false;
- }
- finally
- {
- m_socket.Blocking = blockingState; // 恢复状态
- }
- }
- //通信模块
- protected Socket m_socket; //网络编程接口
- protected string m_ip; //IP
- protected int m_port; //端口
- protected Communication_socket_tcp_statu m_communication_status; //通信状态
- //数据锁
- //作用:将会锁住代码块的内容,并阻止其他线程进入该代码块,直到该代码块运行完成,释放该锁。
- //注意:定义的锁对象应该是 私有的,静态的,只读的,引用类型的对象,这样可以防止外部改变锁对象
- private static readonly object m_receive_list_lock = new object();
- private static readonly object m_send_list_lock = new object();
- private static readonly object m_communication_lock = new object();
- protected Thread m_reconnection_thread; //重连线程
- protected bool m_reconnection_condition; //重连线程条件变量
- protected Thread m_analysis_data_thread; //解析线程
- protected bool m_analysis_data_condition; //解析线程条件变量
- protected Thread m_receive_data_thread; //接收线程
- protected bool m_receive_data_condition; //接收线程条件变量
- protected Thread m_send_data_thread; //发送线程
- protected bool m_send_data_condition; //发送线程条件变量
- protected Thread m_encapsulate_data_thread; //封装的线程
- protected bool m_encapsulate_data_condition; //接受的条件变量
- protected int m_encapsulate_cycle_time; //自动封装时间
- protected List<byte[]> m_receive_data_list; //接收list 存放接收的消息
- protected List<byte[]> m_send_data_list; //发送list 发送存放的消息
- protected EventWaitHandle m_event_exit; //防止CPU卡死
- //数据解析模块
- protected byte[] m_surplusBuffer; //不完整的数据包,即用户自定义缓冲区
- protected int m_headsize; //包头长度 固定4
- protected int m_haveread; //数据包读取位置
- protected int m_totalLen; //数据包总长
- }
- }
|