communication_socket_base.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using NNanomsg.Protocols;
  7. using System.Threading;
  8. using Newtonsoft.Json.Linq;
  9. using Google.Protobuf;
  10. using tool;
  11. namespace chutian_parking_terminal.communication_base
  12. {
  13. class Communication_socket_base
  14. {
  15. //通信状态
  16. public enum Communication_statu
  17. {
  18. COMMUNICATION_UNKNOW = 0, //通信状态 未知
  19. COMMUNICATION_READY = 1, //通信状态 正常
  20. COMMUNICATION_FAULT = 2, //通信状态 错误
  21. };
  22. public Communication_socket_base()
  23. {
  24. m_communication_statu = Communication_statu.COMMUNICATION_UNKNOW;
  25. mp_receive_data_thread = null;
  26. mp_analysis_data_thread = null;
  27. mp_send_data_thread = null;
  28. mp_encapsulate_data_thread = null;
  29. m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
  30. m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
  31. m_socket = new BusSocket();
  32. m_receive_data_list = new List<byte[]>();
  33. m_send_data_list = new List<byte[]>();
  34. //接受线程默认循环
  35. m_receive_data_condition = true;
  36. mp_receive_data_thread = new Thread(receive_data_thread);
  37. //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
  38. m_analysis_data_condition = true;
  39. mp_analysis_data_thread = new Thread(analysis_data_thread);
  40. //发送线程默认循环
  41. m_send_data_condition = true;
  42. mp_send_data_thread = new Thread(send_data_thread);
  43. //封装线程默认循环
  44. m_encapsulate_data_condition = true;
  45. mp_encapsulate_data_thread = new Thread(encapsulate_data_thread);
  46. m_event_exit = new EventWaitHandle(false, EventResetMode.ManualReset);
  47. json_file_operation.Instance.json_file_operation_init("../../settings.json");
  48. }
  49. //初始化通信模块--默认从json文件读取
  50. public virtual void communication_init()
  51. {
  52. //绑定
  53. string bind_string = json_file_operation.Instance.read_json_string("communication_bind");
  54. communication_bind(bind_string);
  55. //连接
  56. JArray connect_string = json_file_operation.Instance.read_json_jarray("communication_connect");
  57. foreach (var connect in connect_string)
  58. {
  59. communication_connect(connect.ToString());
  60. }
  61. //启动通信, run thread
  62. communication_run();
  63. }
  64. //绑定
  65. public virtual void communication_bind(string bind_string)
  66. {
  67. m_socket.Bind(bind_string);
  68. }
  69. //连接
  70. public virtual void communication_connect(string connect_string)
  71. {
  72. m_socket.Connect(connect_string);
  73. }
  74. //启动通信
  75. public virtual void communication_run()
  76. {
  77. m_communication_statu = Communication_statu.COMMUNICATION_READY;
  78. //启动4个线程。
  79. mp_receive_data_thread.Start();
  80. mp_analysis_data_thread.Start();
  81. mp_send_data_thread.Start();
  82. mp_encapsulate_data_thread.Start();
  83. }
  84. //反初始化
  85. public virtual void communication_uninit()
  86. {
  87. //关闭几个线程
  88. m_receive_data_condition = false;
  89. m_analysis_data_condition = false;
  90. m_send_data_condition = false;
  91. m_encapsulate_data_condition = false;
  92. //回收4个线程的资源
  93. if (mp_receive_data_thread != null )
  94. {
  95. mp_receive_data_thread.Abort();
  96. }
  97. if (mp_analysis_data_thread != null )
  98. {
  99. mp_analysis_data_thread.Abort();
  100. }
  101. if (mp_send_data_thread != null )
  102. {
  103. mp_send_data_thread.Abort();
  104. }
  105. if (mp_encapsulate_data_thread != null )
  106. {
  107. mp_encapsulate_data_thread.Abort();
  108. }
  109. //清空list
  110. lock (m_receive_list_lock)
  111. {
  112. m_receive_data_list.Clear();
  113. }
  114. lock (m_send_list_lock)
  115. {
  116. m_send_data_list.Clear();
  117. }
  118. m_communication_statu = Communication_statu.COMMUNICATION_UNKNOW;
  119. }
  120. //设置解析时间
  121. public void set_analysis_cycle_time(int analysis_cycle_time)
  122. {
  123. m_analysis_cycle_time = analysis_cycle_time;
  124. }
  125. //设置封装时间
  126. public void set_encapsulate_cycle_time(int encapsulate_cycle_time)
  127. {
  128. m_encapsulate_cycle_time = encapsulate_cycle_time;
  129. }
  130. //mp_receive_data_thread 接受线程执行函数,
  131. //receive_data_thread 内部线程负责接受消息
  132. protected void receive_data_thread()
  133. {
  134. Console.WriteLine(" Communication_socket_base::receive_data_thread start " );
  135. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  136. while (m_receive_data_condition && !m_event_exit.WaitOne(1) )
  137. {
  138. if (m_receive_data_condition)
  139. {
  140. //非阻塞接受消息, 如果接收到消息, data!=null
  141. byte[] data = m_socket.ReceiveImmediate();
  142. if (data != null)
  143. {
  144. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  145. if (check_msg(data))
  146. {
  147. lock (m_receive_list_lock)
  148. {
  149. m_receive_data_list.Add(data);
  150. }
  151. }
  152. else
  153. {
  154. continue;
  155. }
  156. }
  157. }
  158. }
  159. Console.WriteLine(" Communication_socket_base::receive_data_thread end " );
  160. }
  161. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  162. //子类必须重载, 增加自己模块的判断逻辑
  163. public virtual bool check_msg(byte[] message)
  164. {
  165. // Message.Base_info base_Info = Message.Base_info.Parser.ParseJson(message);
  166. Message.Base_info base_Info = Message.Base_msg.Parser.ParseFrom(message).BaseInfo;
  167. if (base_Info.Receiver == Message.Communicator.EMain)
  168. {
  169. return true;
  170. }
  171. else
  172. {
  173. //无效的消息
  174. return true;
  175. }
  176. }
  177. //mp_analysis_data_thread 解析线程执行函数,
  178. //analysis_data_thread 内部线程负责解析消息
  179. protected void analysis_data_thread()
  180. {
  181. Console.WriteLine(" Communication_socket_base::analysis_data_thread start " );
  182. //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
  183. while (m_analysis_data_condition && !m_event_exit.WaitOne(1) )
  184. {
  185. if (m_analysis_data_condition)
  186. {
  187. //处理链表信息
  188. lock (m_receive_list_lock)
  189. {
  190. analysis_receive_list();
  191. }
  192. }
  193. }
  194. Console.WriteLine(" Communication_socket_base::analysis_data_thread end " );
  195. }
  196. //遍历接受链表, 解析消息,
  197. protected void analysis_receive_list()
  198. {
  199. if (m_receive_data_list.Count == 0 )
  200. {
  201. return ;
  202. }
  203. else
  204. {
  205. for(int i=0;i< m_receive_data_list.Count;)
  206. {
  207. if (m_receive_data_list[i] == null )
  208. {
  209. //删除等于空字符串的节点
  210. m_receive_data_list.Remove(m_receive_data_list[i]);
  211. continue;
  212. }
  213. else
  214. {
  215. //检查消息是否可以被处理
  216. if (check_executer(m_receive_data_list[i]))
  217. {
  218. //处理消息
  219. execute_msg(m_receive_data_list[i]);
  220. m_receive_data_list.Remove(m_receive_data_list[i]);
  221. continue;
  222. }
  223. //若当前状态不能处理 则跳过该消息
  224. else
  225. {
  226. }
  227. }
  228. ++i;
  229. }
  230. }
  231. }
  232. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  233. public virtual bool check_executer(byte[] message)
  234. {
  235. //检查对应模块的状态, 判断是否可以处理这条消息
  236. return true;
  237. }
  238. //处理消息, 需要子类重载
  239. public virtual void execute_msg(byte[] message)
  240. {
  241. //处理消息必须子类重载,本模块只负责通信
  242. }
  243. //mp_send_data_thread 发送线程执行函数,
  244. //send_data_thread 内部线程负责发送消息
  245. protected void send_data_thread()
  246. {
  247. Console.WriteLine(" Communication_socket_base::send_data_thread start " );
  248. //通信发送线程, 负责巡检m_send_data_list, 并发送消息
  249. while (m_send_data_condition && !m_event_exit.WaitOne(1))
  250. {
  251. if ( m_send_data_condition )
  252. {
  253. lock (m_send_list_lock)
  254. {
  255. if (m_send_data_list.Count != 0)
  256. {
  257. for (int i=0;i< m_send_data_list.Count;)
  258. {
  259. if (m_send_data_list[i] != null)
  260. {
  261. m_socket.Send(m_send_data_list[i]);
  262. m_send_data_list.Remove(m_send_data_list[i]);
  263. continue;
  264. }
  265. ++i;
  266. }
  267. }
  268. }
  269. }
  270. }
  271. Console.WriteLine(" Communication_socket_base::send_data_thread end " );
  272. return;
  273. }
  274. //mp_encapsulate_data_thread 封装线程执行函数,
  275. //encapsulate_data_thread 内部线程负责封装消息
  276. protected void encapsulate_data_thread()
  277. {
  278. Console.WriteLine(" Communication_socket_base::encapsulate_data_thread start " );
  279. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  280. while (m_encapsulate_data_condition && !m_event_exit.WaitOne(m_encapsulate_cycle_time))
  281. {
  282. if (m_encapsulate_data_condition)
  283. {
  284. encapsulate_send_data();
  285. }
  286. }
  287. Console.WriteLine(" Communication_socket_base::encapsulate_data_thread end ");
  288. }
  289. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  290. public virtual void encapsulate_send_data()
  291. {
  292. return;
  293. Message.Base_msg t_base_msg;
  294. Message.Store_command_request_msg msg;
  295. t_base_msg.BaseInfo.MsgType = Message.Message_type.EBaseMsg;
  296. t_base_msg.BaseInfo.TimeoutMs = 5000;
  297. t_base_msg.BaseInfo.Sender = Message.Communicator.EMain;
  298. t_base_msg.BaseInfo.Receiver = Message.Communicator.EMain;
  299. m_send_data_list.Add(t_base_msg.ToByteArray());
  300. }
  301. //封装消息, 需要子类重载
  302. public virtual void encapsulate_msg(byte[] message)
  303. {
  304. lock (m_send_list_lock)
  305. {
  306. m_send_data_list.Add(message);
  307. }
  308. }
  309. //通用的网络编程接口, 默认使用总线模式, (网状结构)
  310. protected BusSocket m_socket;
  311. //数据锁
  312. //作用:将会锁住代码块的内容,并阻止其他线程进入该代码块,直到该代码块运行完成,释放该锁。
  313. //注意:定义的锁对象应该是 私有的,静态的,只读的,引用类型的对象,这样可以防止外部改变锁对象
  314. private static readonly object m_receive_list_lock = new object();
  315. private static readonly object m_send_list_lock = new object();
  316. //通信状态
  317. protected Communication_statu m_communication_statu; //通信状态
  318. //接受模块,
  319. private List<byte[]> m_receive_data_list; //接受的list容器
  320. protected Thread mp_receive_data_thread; //接受的线程
  321. protected bool m_receive_data_condition; //接受的条件变量
  322. protected Thread mp_analysis_data_thread; //解析的线程
  323. protected bool m_analysis_data_condition; //解析的条件变量
  324. protected int m_analysis_cycle_time; //自动解析的时间周期
  325. //发送模块,
  326. private List<byte[]> m_send_data_list; //发送的list容器
  327. protected Thread mp_send_data_thread; //发送的线程
  328. protected bool m_send_data_condition; //发送的条件变量
  329. protected Thread mp_encapsulate_data_thread; //封装的线程
  330. protected bool m_encapsulate_data_condition; //封装的条件变量
  331. protected int m_encapsulate_cycle_time; //自动封装的时间周期
  332. protected EventWaitHandle m_event_exit; //防止CPU卡死
  333. }
  334. }