communication_socket_base.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. 
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. using NNanomsg.Protocols;
  8. using System.Threading;
  9. using Newtonsoft.Json.Linq;
  10. using Google.Protobuf;
  11. using tool;
  12. using System.Runtime.InteropServices;
  13. using System.Windows.Forms;
  14. namespace communication_base
  15. {
  16. class Communication_socket_base
  17. {
  18. //[DllImport("nnxx_socket_lib.dll")]
  19. //private static extern int nnxx_init();
  20. //[DllImport("nnxx_socket_lib.dll")]
  21. //private static extern int nnxx_uninit();
  22. //[DllImport("nnxx_socket_lib.dll")]
  23. //private static extern int nnxx_bind(string addr);
  24. //[DllImport("nnxx_socket_lib.dll")]
  25. //private static extern int nnxx_connect(string addr);
  26. //[DllImport("nnxx_socket_lib.dll")]
  27. //private static extern int nnxx_recv(byte[] addr, uint len, int flags = 0);
  28. //[DllImport("nnxx_socket_lib.dll")]
  29. //private static extern int nnxx_send(byte[] addr, uint len, int flags = 0);
  30. //[DllImport("nnxx_socket_lib.dll")]
  31. //private static extern void nnxx_close();
  32. [DllImport("nanomsg_nng_lib.dll")]
  33. private static extern void nanomsg_nng_init();
  34. [DllImport("nanomsg_nng_lib.dll")]
  35. private static extern void nanomsg_nng_uninit();
  36. [DllImport("nanomsg_nng_lib.dll")]
  37. private static extern int nanomsg_nng_bind(string addr);
  38. [DllImport("nanomsg_nng_lib.dll")]
  39. private static extern int nanomsg_nng_connect(string addr);
  40. [DllImport("nanomsg_nng_lib.dll")]
  41. private static extern int nanomsg_nng_recv(byte[] addr, uint len);
  42. [DllImport("nanomsg_nng_lib.dll")]
  43. private static extern int nanomsg_nng_send(byte[] addr, uint len);
  44. //通信状态
  45. public enum Communication_statu
  46. {
  47. COMMUNICATION_UNKNOW = 0, //通信状态 未知
  48. COMMUNICATION_READY = 1, //通信状态 正常
  49. COMMUNICATION_FAULT = 2, //通信状态 错误
  50. };
  51. public Communication_socket_base()
  52. {
  53. m_communication_statu = Communication_statu.COMMUNICATION_UNKNOW;
  54. mp_receive_data_thread = null;
  55. mp_analysis_data_thread = null;
  56. mp_send_data_thread = null;
  57. mp_encapsulate_data_thread = null;
  58. }
  59. //初始化通信模块--默认从json文件读取
  60. public virtual void communication_init()
  61. {
  62. //m_socket = new BusSocket();
  63. //nnxx_init();
  64. nanomsg_nng_init();
  65. m_receive_data_queue = new Queue<ByteString>();
  66. m_send_data_queue = new Queue<ByteString>();
  67. //接受线程默认循环
  68. m_receive_data_condition = true;
  69. mp_receive_data_thread = new Thread(receive_data_thread);
  70. //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_queue
  71. m_analysis_data_condition = true;
  72. mp_analysis_data_thread = new Thread(analysis_data_thread);
  73. //发送线程默认循环
  74. m_send_data_condition = true;
  75. mp_send_data_thread = new Thread(send_data_thread);
  76. //封装线程默认循环
  77. m_encapsulate_data_condition = true;
  78. mp_encapsulate_data_thread = new Thread(encapsulate_data_thread);
  79. m_event_exit = new EventWaitHandle(false, EventResetMode.ManualReset);
  80. m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
  81. m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
  82. //绑定
  83. string bind_string = json_file_operation.Instance.read_json_string("communication_bind");
  84. communication_bind(bind_string);
  85. //连接
  86. JArray connect_string = json_file_operation.Instance.read_json_jarray("communication_connect");
  87. foreach (var connect in connect_string)
  88. {
  89. communication_connect(connect.ToString());
  90. }
  91. //启动通信, run thread
  92. communication_run();
  93. }
  94. //绑定
  95. public virtual void communication_bind(string bind_string)
  96. {
  97. //m_socket.Bind(bind_string);
  98. //nnxx_bind(bind_string);
  99. int results = nanomsg_nng_bind(bind_string);
  100. if (results == 0)
  101. {
  102. Console.WriteLine("communication_bind:" + bind_string + " success !");
  103. }
  104. else
  105. {
  106. Console.WriteLine("communication_bind:" + bind_string + " fail !!!!");
  107. MessageBox.Show("communication_bind:" + bind_string + " fail !!!!");
  108. }
  109. }
  110. //连接
  111. public virtual void communication_connect(string connect_string)
  112. {
  113. //m_socket.Connect(connect_string);
  114. //nnxx_connect(connect_string);
  115. int results = nanomsg_nng_connect(connect_string);
  116. if (results == 0)
  117. {
  118. Console.WriteLine("communication_connect:" + connect_string + " success !");
  119. }
  120. else
  121. {
  122. Console.WriteLine("communication_connect:" + connect_string + " fail !!!!");
  123. MessageBox.Show("communication_connect:" + connect_string + " fail !!!!");
  124. }
  125. }
  126. //启动通信
  127. public virtual void communication_run()
  128. {
  129. m_communication_statu = Communication_statu.COMMUNICATION_READY;
  130. //启动4个线程。
  131. mp_receive_data_thread.Start();
  132. mp_analysis_data_thread.Start();
  133. mp_send_data_thread.Start();
  134. mp_encapsulate_data_thread.Start();
  135. }
  136. //反初始化
  137. public virtual void communication_uninit()
  138. {
  139. //关闭几个线程
  140. m_receive_data_condition = false;
  141. m_analysis_data_condition = false;
  142. m_send_data_condition = false;
  143. m_encapsulate_data_condition = false;
  144. //回收4个线程的资源
  145. if (mp_receive_data_thread != null)
  146. {
  147. mp_receive_data_thread.Join();
  148. }
  149. if (mp_analysis_data_thread != null)
  150. {
  151. mp_analysis_data_thread.Join();
  152. }
  153. if (mp_send_data_thread != null)
  154. {
  155. mp_send_data_thread.Join();
  156. }
  157. if (mp_encapsulate_data_thread != null)
  158. {
  159. mp_encapsulate_data_thread.Join();
  160. }
  161. //清空list
  162. lock (m_receive_list_lock)
  163. {
  164. m_receive_data_queue.Clear();
  165. }
  166. lock (m_send_list_lock)
  167. {
  168. m_send_data_queue.Clear();
  169. }
  170. //nnxx_close();
  171. //nnxx_uninit();
  172. nanomsg_nng_uninit();
  173. m_communication_statu = Communication_statu.COMMUNICATION_UNKNOW;
  174. }
  175. //设置解析时间
  176. public void set_analysis_cycle_time(int analysis_cycle_time)
  177. {
  178. m_analysis_cycle_time = analysis_cycle_time;
  179. }
  180. //设置封装时间
  181. public void set_encapsulate_cycle_time(int encapsulate_cycle_time)
  182. {
  183. m_encapsulate_cycle_time = encapsulate_cycle_time;
  184. }
  185. //mp_receive_data_thread 接受线程执行函数,
  186. //receive_data_thread 内部线程负责接受消息
  187. protected void receive_data_thread()
  188. {
  189. Console.WriteLine(" Communication_socket_base::receive_data_thread start ");
  190. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_queue
  191. while (m_receive_data_condition && !m_event_exit.WaitOne(1))
  192. {
  193. if (m_receive_data_condition)
  194. {
  195. //非阻塞接受消息, 如果接收到消息, data!=null
  196. //byte[] data = m_socket.ReceiveImmediate();
  197. byte[] data = new byte[65536];
  198. int length = nanomsg_nng_recv(data, 65536);
  199. if (length > 0)
  200. //if (data != null)
  201. {
  202. //截取实际长度
  203. ByteString message = ByteString.CopyFrom(data.Skip(0).Take(length).ToArray());
  204. //ByteString message = ByteString.CopyFrom(data.ToArray());
  205. Message.Base_info base_Info = Message.Base_msg.Parser.ParseFrom(message).BaseInfo;
  206. //Console.WriteLine(base_Info.ToString());
  207. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  208. if (check_msg(message))
  209. {
  210. lock (m_receive_list_lock)
  211. {
  212. //添加入队列
  213. m_receive_data_queue.Enqueue(message);
  214. }
  215. }
  216. }
  217. }
  218. }
  219. Console.WriteLine(" Communication_socket_base::receive_data_thread end ");
  220. }
  221. //mp_analysis_data_thread 解析线程执行函数,
  222. //analysis_data_thread 内部线程负责解析消息
  223. protected void analysis_data_thread()
  224. {
  225. Console.WriteLine(" Communication_socket_base::analysis_data_thread start ");
  226. //通信解析线程, 负责巡检m_receive_data_queue, 并解析和处理消息
  227. while (m_analysis_data_condition && !m_event_exit.WaitOne(1))
  228. {
  229. if (m_analysis_data_condition)
  230. {
  231. //处理队列信息
  232. lock (m_receive_list_lock)
  233. {
  234. if (m_receive_data_queue.Count > 0)
  235. {
  236. //取出消息
  237. ByteString message = m_receive_data_queue.Dequeue();
  238. //检查消息是否可以被处理
  239. if (check_executer())
  240. {
  241. //处理消息
  242. execute_msg(message);
  243. }
  244. else
  245. {
  246. //不能处理就放到末尾 等能处理时处理
  247. m_receive_data_queue.Enqueue(message);
  248. }
  249. }
  250. //没有消息直接跳过
  251. }
  252. }
  253. }
  254. Console.WriteLine(" Communication_socket_base::analysis_data_thread end ");
  255. }
  256. //mp_send_data_thread 发送线程执行函数,
  257. //send_data_thread 内部线程负责发送消息
  258. protected void send_data_thread()
  259. {
  260. Console.WriteLine(" Communication_socket_base::send_data_thread start ");
  261. //通信发送线程, 负责巡检m_send_data_queue, 并发送消息
  262. while (m_send_data_condition && !m_event_exit.WaitOne(1))
  263. {
  264. if (m_send_data_condition)
  265. {
  266. lock (m_send_list_lock)
  267. {
  268. if (m_send_data_queue.Count > 0)
  269. {
  270. ByteString message = m_send_data_queue.Dequeue();
  271. //m_socket.Send(message.ToByteArray());
  272. //nnxx_send(message.ToByteArray(), (uint)message.ToByteArray().Length, 1);
  273. nanomsg_nng_send(message.ToByteArray(), (uint)message.ToByteArray().Length);
  274. }
  275. }
  276. }
  277. }
  278. Console.WriteLine(" Communication_socket_base::send_data_thread end ");
  279. return;
  280. }
  281. //mp_encapsulate_data_thread 封装线程执行函数,
  282. //encapsulate_data_thread 内部线程负责封装消息
  283. protected void encapsulate_data_thread()
  284. {
  285. Console.WriteLine(" Communication_socket_base::encapsulate_data_thread start ");
  286. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_queue
  287. while (m_encapsulate_data_condition && !m_event_exit.WaitOne(m_encapsulate_cycle_time))
  288. {
  289. if (m_encapsulate_data_condition)
  290. {
  291. encapsulate_send_data();
  292. }
  293. }
  294. Console.WriteLine(" Communication_socket_base::encapsulate_data_thread end ");
  295. }
  296. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  297. //子类必须重载, 增加自己模块的判断逻辑
  298. public virtual bool check_msg(ByteString message)
  299. {
  300. // Message.Base_info base_Info = Message.Base_info.Parser.ParseJson(message);
  301. Message.Base_info base_Info = Message.Base_msg.Parser.ParseFrom(message).BaseInfo;
  302. if (base_Info.Receiver == Message.Communicator.EMain)
  303. {
  304. return true;
  305. }
  306. else
  307. {
  308. //无效的消息
  309. return true;
  310. }
  311. }
  312. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  313. public virtual bool check_executer()
  314. {
  315. //检查对应模块的状态, 判断是否可以处理这条消息
  316. if (m_communication_statu == Communication_statu.COMMUNICATION_READY)
  317. return true;
  318. return false;
  319. }
  320. //处理消息, 需要子类重载
  321. public virtual void execute_msg(ByteString message)
  322. {
  323. //处理消息必须子类重载,本模块只负责通信
  324. }
  325. //封装消息, 需要子类重载
  326. public virtual void encapsulate_msg(ByteString message)
  327. {
  328. lock (m_send_list_lock)
  329. {
  330. m_send_data_queue.Enqueue(message);
  331. }
  332. }
  333. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  334. public virtual void encapsulate_send_data()
  335. {
  336. return;
  337. Message.Base_msg t_base_msg;
  338. Message.Store_command_request_msg msg;
  339. t_base_msg.BaseInfo.MsgType = Message.Message_type.EBaseMsg;
  340. t_base_msg.BaseInfo.TimeoutMs = 5000;
  341. t_base_msg.BaseInfo.Sender = Message.Communicator.EMain;
  342. t_base_msg.BaseInfo.Receiver = Message.Communicator.EMain;
  343. m_send_data_queue.Enqueue(t_base_msg.ToByteString());
  344. }
  345. //通用的网络编程接口, 默认使用总线模式, (网状结构)
  346. protected BusSocket m_socket;
  347. //数据锁
  348. //作用:将会锁住代码块的内容,并阻止其他线程进入该代码块,直到该代码块运行完成,释放该锁。
  349. //注意:定义的锁对象应该是 私有的,静态的,只读的,引用类型的对象,这样可以防止外部改变锁对象
  350. private static readonly object m_receive_list_lock = new object();
  351. private static readonly object m_send_list_lock = new object();
  352. //通信状态
  353. protected Communication_statu m_communication_statu; //通信状态
  354. //接受模块,
  355. private Queue<ByteString> m_receive_data_queue; //接受的队列容器
  356. protected Thread mp_receive_data_thread; //接受的线程
  357. protected bool m_receive_data_condition; //接受的条件变量
  358. protected Thread mp_analysis_data_thread; //解析的线程
  359. protected bool m_analysis_data_condition; //解析的条件变量
  360. protected int m_analysis_cycle_time; //自动解析的时间周期
  361. //发送模块,
  362. private Queue<ByteString> m_send_data_queue; //发送的队列容器
  363. protected Thread mp_send_data_thread; //发送的线程
  364. protected bool m_send_data_condition; //发送的条件变量
  365. protected Thread mp_encapsulate_data_thread; //封装的线程
  366. protected bool m_encapsulate_data_condition; //封装的条件变量
  367. protected int m_encapsulate_cycle_time; //自动封装的时间周期
  368. protected EventWaitHandle m_event_exit; //防止CPU卡死
  369. }
  370. }