Communicator.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. using NNanomsg;
  2. using NNanomsg.Protocols;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using Google.Protobuf;
  10. using Message;
  11. namespace Communication
  12. {
  13. class MsgStamped
  14. {
  15. public DateTime receive_time;
  16. public ByteString msg;
  17. public Base_info header;
  18. public MsgStamped()
  19. {
  20. receive_time = DateTime.Now;
  21. msg = ByteString.Empty;
  22. }
  23. public MsgStamped(byte[] bytes)
  24. {
  25. receive_time = DateTime.Now;
  26. msg = ByteString.CopyFrom(bytes);
  27. }
  28. public MsgStamped(ByteString msg)
  29. {
  30. receive_time = DateTime.Now;
  31. this.msg = msg;
  32. }
  33. public MsgStamped(ByteString msg, Base_info header)
  34. {
  35. receive_time = DateTime.Now;
  36. this.msg = msg;
  37. this.header = header;
  38. if(this.header.TimeoutMs <= 0)
  39. {
  40. this.header.TimeoutMs = 5000;
  41. }
  42. }
  43. }
  44. enum CommunicatorStatus
  45. {
  46. COMMUNICATION_UNKNOW = 0, //通信状态 未知
  47. COMMUNICATION_READY = 1, //通信状态 正常
  48. COMMUNICATION_FAULT = 3, //通信状态 错误
  49. }
  50. enum CheckExecuterReturn
  51. {
  52. MSG_TIMEOUT = 0, // 消息超时
  53. EXECUTER_READY = 1, // 执行器就绪
  54. EXECUTER_BUSY = 2, // 执行器正忙
  55. EXECUTER_FAULT = 3, // 执行器错误
  56. }
  57. class Communicator
  58. {
  59. /// <summary>
  60. /// 实例退出标记
  61. /// </summary>
  62. public bool mb_exit;
  63. /// <summary>
  64. /// 初始化标记
  65. /// </summary>
  66. public bool mb_initialized;
  67. /// <summary>
  68. /// 消息超时时间
  69. /// </summary>
  70. public int m_timeout_milli = 2000;
  71. /// <summary>
  72. /// 通信类当前状态
  73. /// </summary>
  74. public CommunicatorStatus m_status;
  75. /// <summary>
  76. /// 单例锁对象
  77. /// </summary>
  78. protected readonly static object lockObj = new object();
  79. /// <summary>
  80. /// 单例
  81. /// </summary>
  82. protected static Communicator instance = null;
  83. /// <summary>
  84. /// 接收解析锁
  85. /// </summary>
  86. protected object m_receive_lock;
  87. /// <summary>
  88. /// 发送锁
  89. /// </summary>
  90. protected object m_send_lock;
  91. /// <summary>
  92. /// 发送队列
  93. /// </summary>
  94. protected Queue<MsgStamped> m_send_queue;
  95. /// <summary>
  96. /// 接收队列
  97. /// </summary>
  98. protected Queue<MsgStamped> m_receive_queue;
  99. /// <summary>
  100. /// 发送线程
  101. /// </summary>
  102. protected Thread m_thread_send;
  103. /// <summary>
  104. /// 接收线程
  105. /// </summary>
  106. protected Thread m_thread_receive;
  107. /// <summary>
  108. /// 解析接收string到protobuf消息
  109. /// </summary>
  110. protected Thread m_thread_decode_receive;
  111. /// <summary>
  112. /// nanomsg 通信句柄
  113. /// </summary>
  114. protected BusSocket m_socket;
  115. /// <summary>
  116. /// nnxx生成id队列
  117. /// </summary>
  118. protected Queue<NanomsgEndpoint> nanomsgEndpoints_queue;
  119. /// <summary>
  120. /// 构造函数
  121. /// </summary>
  122. /// <param name="server_ip"></param>
  123. /// <param name="server_port"></param>
  124. protected Communicator()
  125. {
  126. mb_exit = false;
  127. mb_initialized = false;
  128. m_receive_lock = new object();
  129. m_send_lock = new object();
  130. m_receive_queue = new Queue<MsgStamped>();
  131. m_send_queue = new Queue<MsgStamped>();
  132. nanomsgEndpoints_queue = new Queue<NanomsgEndpoint>();
  133. m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
  134. }
  135. /// <summary>
  136. /// 析构函数
  137. /// </summary>
  138. ~Communicator()
  139. {
  140. mb_exit = true;
  141. if (m_thread_send != null)
  142. {
  143. m_thread_send.Join();
  144. }
  145. if (m_thread_receive != null)
  146. {
  147. m_thread_receive.Join();
  148. }
  149. if (m_thread_decode_receive != null)
  150. {
  151. m_thread_decode_receive.Join();
  152. }
  153. }
  154. /// <summary>
  155. /// 单例访问
  156. /// </summary>
  157. public static Communicator GetInstance()
  158. {
  159. if (instance == null)
  160. {
  161. lock (lockObj)
  162. {
  163. if (instance == null)
  164. {
  165. instance = new Communicator();
  166. }
  167. }
  168. }
  169. return instance;
  170. }
  171. /// <summary>
  172. /// 初始化
  173. /// </summary>
  174. /// <returns></returns>
  175. public bool Init()
  176. {
  177. try
  178. {
  179. if (!mb_initialized)
  180. {
  181. mb_exit = false;
  182. mb_initialized = true;
  183. m_socket = new BusSocket();
  184. m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function));
  185. m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function));
  186. m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function));
  187. m_thread_receive.Start(this);
  188. m_thread_send.Start(this);
  189. m_thread_decode_receive.Start(this);
  190. m_status = CommunicatorStatus.COMMUNICATION_READY;
  191. return true;
  192. }
  193. else
  194. {
  195. return false;
  196. }
  197. }
  198. catch (Exception ex) { Console.WriteLine(ex.StackTrace); return false; }
  199. }
  200. /// <summary>
  201. /// 反初始化
  202. /// </summary>
  203. /// <returns></returns>
  204. public bool Uninit()
  205. {
  206. m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
  207. mb_exit = true;
  208. if (m_thread_receive != null)
  209. m_thread_receive.Join();
  210. if (m_thread_send != null)
  211. m_thread_send.Join();
  212. if (m_thread_decode_receive != null)
  213. m_thread_decode_receive.Join();
  214. if(m_socket!=null)
  215. {
  216. while(nanomsgEndpoints_queue.Count>0)
  217. {
  218. NanomsgEndpoint nnep = nanomsgEndpoints_queue.Dequeue();
  219. m_socket.Shutdown(nnep);
  220. }
  221. m_socket.Dispose();
  222. }
  223. mb_initialized = false;
  224. return true;
  225. }
  226. /// <summary>
  227. /// 连接
  228. /// </summary>
  229. /// <param name="server_address"></param>
  230. /// <returns></returns>
  231. public bool Connect(string server_address)
  232. {
  233. if (m_socket == null)
  234. return false;
  235. NanomsgEndpoint end_point = m_socket.Connect(server_address);
  236. return true;
  237. }
  238. /// <summary>
  239. /// 绑定本地端口监听
  240. /// </summary>
  241. /// <param name="self_address"></param>
  242. /// <returns></returns>
  243. public bool Bind(string self_address)
  244. {
  245. if (m_socket == null)
  246. return false;
  247. NanomsgEndpoint end_point = m_socket.Bind(self_address);
  248. return true;
  249. }
  250. /// <summary>
  251. /// 发送消息
  252. /// </summary>
  253. /// <returns></returns>
  254. public bool Send_msg(ByteString bs)
  255. {
  256. lock(m_send_lock)
  257. {
  258. m_send_queue.Enqueue(new MsgStamped(bs));
  259. }
  260. return true;
  261. }
  262. /// <summary>
  263. /// 检查消息
  264. /// </summary>
  265. /// <param name="header"></param>
  266. /// <returns></returns>
  267. public virtual bool CheckMsg(Base_info header)
  268. {
  269. if (header.HasMsgType && header.HasSender && header.HasTimeoutMs
  270. && header.MsgType == Message_type.EBaseMsg && header.Sender == Message.Communicator.EMain)
  271. return true;
  272. else
  273. return false;
  274. }
  275. /// <summary>
  276. /// 检查执行器状态
  277. /// </summary>
  278. /// <param name="header"></param>
  279. /// <param name="receiveTime"></param>
  280. /// <returns></returns>
  281. public virtual CheckExecuterReturn CheckExecuter(Base_info header, DateTime receiveTime)
  282. {
  283. if ((DateTime.Now - receiveTime).Milliseconds > header.TimeoutMs)
  284. return CheckExecuterReturn.MSG_TIMEOUT;
  285. else
  286. return CheckExecuterReturn.EXECUTER_READY;
  287. }
  288. /// <summary>
  289. /// 执行消息
  290. /// </summary>
  291. /// <param name="msgStamped"></param>
  292. /// <returns></returns>
  293. public virtual bool ExecuteMsg(MsgStamped msgStamped)
  294. {
  295. return true;
  296. }
  297. /// <summary>
  298. /// 接收线程函数
  299. /// </summary>
  300. /// <param name="handle"></param>
  301. private static void Receive_thread_function(object handle)
  302. {
  303. if (handle == null)
  304. return;
  305. Communicator comm = (Communicator)handle;
  306. while (!comm.mb_exit)
  307. {
  308. try
  309. {
  310. if (!comm.mb_initialized || comm.m_socket == null)
  311. continue;
  312. byte[] data = comm.m_socket.ReceiveImmediate();
  313. if (data != null && data.Length > 0 && comm.m_receive_queue != null)
  314. {
  315. // 解析头
  316. Base_msg base_msg = Base_msg.Parser.ParseFrom(data);
  317. //Console.WriteLine(base_msg.ToString());
  318. if (!comm.CheckMsg(base_msg.BaseInfo))
  319. continue;
  320. lock (comm.m_receive_lock)
  321. {
  322. comm.m_receive_queue.Enqueue(new MsgStamped(ByteString.CopyFrom(data), base_msg.BaseInfo));
  323. }
  324. }
  325. }
  326. catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
  327. Thread.Sleep(1);
  328. }
  329. Console.WriteLine("receive thread exit");
  330. }
  331. /// <summary>
  332. /// 发送线程函数
  333. /// </summary>
  334. /// <param name="handle"></param>
  335. private static void Send_thread_function(object handle)
  336. {
  337. if (handle == null)
  338. return;
  339. Communicator comm = (Communicator)handle;
  340. while (!comm.mb_exit)
  341. {
  342. try
  343. {
  344. if (!comm.mb_initialized || comm.m_socket == null)
  345. continue;
  346. lock (comm.m_send_lock)
  347. {
  348. if(comm.m_send_queue.Count > 0)
  349. {
  350. MsgStamped msg_stamped = comm.m_send_queue.Dequeue();
  351. comm.m_socket.Send(msg_stamped.msg.ToByteArray());
  352. }
  353. }
  354. //Console.WriteLine("msg sent");
  355. }
  356. catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
  357. Thread.Sleep(1);
  358. }
  359. Console.WriteLine("send thread exit");
  360. }
  361. /// <summary>
  362. /// 解析线程函数
  363. /// </summary>
  364. /// <param name="handle"></param>
  365. private static void Decode_thread_function(object handle)
  366. {
  367. if (handle == null)
  368. return;
  369. Communicator comm = (Communicator)handle;
  370. while (!comm.mb_exit)
  371. {
  372. try
  373. {
  374. DateTime current_time = DateTime.Now;
  375. if (!comm.mb_initialized)
  376. continue;
  377. MsgStamped msg_stamped = null;
  378. lock (comm.m_receive_lock)
  379. {
  380. if (comm.m_receive_queue.Count > 0)
  381. {
  382. msg_stamped = comm.m_receive_queue.Dequeue();
  383. }
  384. else
  385. {
  386. continue;
  387. }
  388. CheckExecuterReturn ret = comm.CheckExecuter(msg_stamped.header, msg_stamped.receive_time);
  389. if (ret == CheckExecuterReturn.MSG_TIMEOUT)
  390. {
  391. continue;
  392. }else if(ret == CheckExecuterReturn.EXECUTER_BUSY || ret == CheckExecuterReturn.EXECUTER_FAULT)
  393. {
  394. comm.m_receive_queue.Enqueue(msg_stamped);
  395. }else if(ret == CheckExecuterReturn.EXECUTER_READY)
  396. {
  397. comm.ExecuteMsg(msg_stamped);
  398. }
  399. }
  400. // 解析
  401. //comm.Decode_msg(msg_stamped.msg);
  402. //Console.WriteLine("msg parsed");
  403. }
  404. catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
  405. Thread.Sleep(1);
  406. }
  407. Console.WriteLine("decode thread exit");
  408. }
  409. ///// <summary>
  410. ///// 解析string到protobuf消息
  411. ///// </summary>
  412. //private void Decode_msg(ByteString msg)
  413. //{
  414. // if (msg == null)
  415. // return;
  416. // Base_msg base_msg = Base_msg.Parser.ParseFrom(msg);
  417. // switch(base_msg.BaseInfo.MsgType)
  418. // {
  419. // case Message_type.EParkspaceAllocationStatusMsg:
  420. // Parkspace_allocation_status_msg parkspace_status_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg);
  421. // break;
  422. // case Message_type.EParkspaceForceUpdateResponseMsg:
  423. // Console.WriteLine("update response");
  424. // Parkspace_force_update_response_msg parkspace_force_update_msg = Parkspace_force_update_response_msg.Parser.ParseFrom(msg);
  425. // Console.WriteLine(parkspace_force_update_msg.ToString());
  426. // break;
  427. // case Message_type.EGroundStatusMsg:
  428. // Console.WriteLine("get ground status.....");
  429. // Ground_status_msg ground_status_msg = Ground_status_msg.Parser.ParseFrom(msg);
  430. // Console.WriteLine(ground_status_msg.ToString());
  431. // break;
  432. // default:
  433. // Console.WriteLine("unrecognized message received");
  434. // break;
  435. // }
  436. //}
  437. }
  438. }