Communicator.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. using NNanomsg;
  2. using NNanomsg.Protocols;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using Google.Protobuf;
  10. using Message;
  11. namespace parkspace_manager
  12. {
  13. class Communicator
  14. {
  15. // 各类消息委托,收到对应消息即调用对应委托/回调
  16. public delegate void parkspaceStatusDelegate(Parkspace_allocation_data_msg parkspace_status);
  17. public delegate void parkspaceForceUpdateResponseDelegate(Parkspace_force_update_response_msg response);
  18. // 对应委托/回调句柄
  19. private parkspaceStatusDelegate m_parkspaceStatusCallback;
  20. private parkspaceForceUpdateResponseDelegate m_parkspaceForceUpdateResponseCallback;
  21. // 对应委托/回调句柄设置函数
  22. public void SetParkspaceStatusDelegate(parkspaceStatusDelegate callback)
  23. {
  24. m_parkspaceStatusCallback = callback;
  25. }
  26. public void SetParkspaceForceUpdateResponseDelegate(parkspaceForceUpdateResponseDelegate callback)
  27. {
  28. m_parkspaceForceUpdateResponseCallback = callback;
  29. }
  30. /// <summary>
  31. /// 消息超时时间
  32. /// </summary>
  33. public int m_timeout_milli = 2000;
  34. /// <summary>
  35. /// 单例锁对象
  36. /// </summary>
  37. private readonly static object lockObj = new object();
  38. /// <summary>
  39. /// 单例
  40. /// </summary>
  41. private static Communicator instance = null;
  42. /// <summary>
  43. /// 接收解析锁
  44. /// </summary>
  45. private object m_receive_lock;
  46. /// <summary>
  47. /// 发送锁
  48. /// </summary>
  49. private object m_send_lock;
  50. /// <summary>
  51. /// 车位状态访问锁
  52. /// </summary>
  53. private object m_parkspace_status_access_lock;
  54. /// <summary>
  55. /// 车位消息队列
  56. /// </summary>
  57. private Parkspace_allocation_data_msg m_parkspace_current_status;
  58. /// <summary>
  59. /// 当前时间
  60. /// </summary>
  61. private DateTime m_last_parkspace_status_time;
  62. /// <summary>
  63. /// 发送队列
  64. /// </summary>
  65. private Queue<ByteString> m_send_queue;
  66. /// <summary>
  67. /// 接收队列
  68. /// </summary>
  69. private Queue<ByteString> m_receive_queue;
  70. /// <summary>
  71. /// 发送线程
  72. /// </summary>
  73. private Thread m_thread_send;
  74. /// <summary>
  75. /// 接收线程
  76. /// </summary>
  77. private Thread m_thread_receive;
  78. /// <summary>
  79. /// 解析接收string到protobuf消息
  80. /// </summary>
  81. private Thread m_thread_decode_receive;
  82. /// <summary>
  83. /// nanomsg 通信句柄
  84. /// </summary>
  85. private BusSocket m_socket;
  86. /// <summary>
  87. /// nnxx生成id队列
  88. /// </summary>
  89. private Queue<NanomsgEndpoint> nanomsgEndpoints_queue;
  90. /// <summary>
  91. /// 实例退出标记
  92. /// </summary>
  93. public bool mb_exit;
  94. /// <summary>
  95. /// 初始化标记
  96. /// </summary>
  97. public bool mb_initialized;
  98. /// <summary>
  99. /// 构造函数
  100. /// </summary>
  101. /// <param name="server_ip"></param>
  102. /// <param name="server_port"></param>
  103. private Communicator()
  104. {
  105. mb_exit = false;
  106. mb_initialized = false;
  107. m_receive_lock = new object();
  108. m_send_lock = new object();
  109. m_parkspace_status_access_lock = new object();
  110. m_receive_queue = new Queue<ByteString>();
  111. m_send_queue = new Queue<ByteString>();
  112. m_parkspace_current_status = new Parkspace_allocation_data_msg();
  113. nanomsgEndpoints_queue = new Queue<NanomsgEndpoint>();
  114. }
  115. /// <summary>
  116. /// 析构函数
  117. /// </summary>
  118. ~Communicator()
  119. {
  120. mb_exit = true;
  121. if (m_thread_send != null)
  122. {
  123. m_thread_send.Abort();
  124. }
  125. if (m_thread_receive != null)
  126. {
  127. m_thread_receive.Abort();
  128. }
  129. if (m_thread_decode_receive != null)
  130. m_thread_decode_receive.Abort();
  131. }
  132. /// <summary>
  133. /// 单例访问
  134. /// </summary>
  135. public static Communicator Instance
  136. {
  137. get
  138. {
  139. if (instance == null)
  140. {
  141. lock (lockObj)
  142. {
  143. if (instance == null)
  144. {
  145. instance = new Communicator();
  146. }
  147. }
  148. }
  149. return instance;
  150. }
  151. }
  152. /// <summary>
  153. /// 初始化
  154. /// </summary>
  155. /// <returns></returns>
  156. public bool Init()
  157. {
  158. try
  159. {
  160. if (!mb_initialized)
  161. {
  162. mb_exit = false;
  163. mb_initialized = true;
  164. m_socket = new BusSocket();
  165. m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function));
  166. m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function));
  167. m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function));
  168. m_thread_receive.Start(this);
  169. m_thread_send.Start(this);
  170. m_thread_decode_receive.Start(this);
  171. // 设置车位状态回调
  172. SetParkspaceStatusDelegate(ParkspaceStatusUpdate);
  173. // 设置车位刷新线程
  174. Task.Factory.StartNew(()=> {
  175. while (!mb_exit)
  176. {
  177. Parkspace_refresh_request_msg t_refresh_req = new Parkspace_refresh_request_msg();
  178. t_refresh_req.BaseInfo = new Base_info();
  179. t_refresh_req.BaseInfo.MsgType = Message_type.EParkspaceAllocationDataMsg;
  180. t_refresh_req.BaseInfo.TimeoutMs = 5000;
  181. t_refresh_req.BaseInfo.Sender = Message.Communicator.EEmpty;
  182. t_refresh_req.BaseInfo.Receiver = Message.Communicator.EParkspace;
  183. Send_msg(t_refresh_req.ToByteString());
  184. Thread.Sleep(1000 * 5);
  185. }
  186. });
  187. return true;
  188. }
  189. else
  190. {
  191. return false;
  192. }
  193. }
  194. catch (Exception ex) { Console.WriteLine(ex.StackTrace); return false; }
  195. }
  196. /// <summary>
  197. /// 反初始化
  198. /// </summary>
  199. /// <returns></returns>
  200. public bool Uninit()
  201. {
  202. mb_exit = true;
  203. if (m_thread_receive != null)
  204. m_thread_receive.Join();
  205. if (m_thread_send != null)
  206. m_thread_send.Join();
  207. if (m_thread_decode_receive != null)
  208. m_thread_decode_receive.Join();
  209. if(m_socket!=null)
  210. {
  211. while(nanomsgEndpoints_queue.Count>0)
  212. {
  213. NanomsgEndpoint nnep = nanomsgEndpoints_queue.Dequeue();
  214. m_socket.Shutdown(nnep);
  215. }
  216. m_socket.Dispose();
  217. }
  218. mb_initialized = false;
  219. return true;
  220. }
  221. /// <summary>
  222. /// 连接
  223. /// </summary>
  224. /// <param name="server_address"></param>
  225. /// <returns></returns>
  226. public bool Connect(string server_address)
  227. {
  228. if (m_socket == null)
  229. return false;
  230. NanomsgEndpoint end_point = m_socket.Connect(server_address);
  231. nanomsgEndpoints_queue.Enqueue(end_point);
  232. return true;
  233. }
  234. /// <summary>
  235. /// 绑定本地端口监听
  236. /// </summary>
  237. /// <param name="self_address"></param>
  238. /// <returns></returns>
  239. public bool Bind(string self_address)
  240. {
  241. if (m_socket == null)
  242. return false;
  243. NanomsgEndpoint end_point = m_socket.Bind(self_address);
  244. nanomsgEndpoints_queue.Enqueue(end_point);
  245. return true;
  246. }
  247. /// <summary>
  248. /// 车位更新回调内部定义
  249. /// </summary>
  250. /// <param name="parkspace_status"></param>
  251. private void ParkspaceStatusUpdate(Parkspace_allocation_data_msg parkspace_status)
  252. {
  253. lock (m_parkspace_status_access_lock)
  254. {
  255. m_parkspace_current_status = parkspace_status;
  256. //comm.m_parkspace_current_status.MergeFrom(parkspace_status);
  257. m_last_parkspace_status_time = DateTime.Now;
  258. }
  259. }
  260. /// <summary>
  261. /// 获取当前车位状态
  262. /// </summary>
  263. public bool Get_parkspace_status(ref Parkspace_allocation_data_msg msg)
  264. {
  265. lock (m_parkspace_status_access_lock)
  266. {
  267. msg = new Parkspace_allocation_data_msg();
  268. msg.MergeFrom(m_parkspace_current_status);
  269. }
  270. if ((DateTime.Now - m_last_parkspace_status_time).Milliseconds > m_timeout_milli)
  271. {
  272. return false;
  273. }
  274. else
  275. {
  276. return true;
  277. }
  278. }
  279. /// <summary>
  280. /// 发送手动更新车位消息
  281. /// </summary>
  282. /// <returns></returns>
  283. public bool Send_msg(ByteString bs)
  284. {
  285. lock(m_send_lock)
  286. {
  287. m_send_queue.Enqueue(bs);
  288. }
  289. return true;
  290. }
  291. /// <summary>
  292. /// 接收线程函数
  293. /// </summary>
  294. /// <param name="handle"></param>
  295. private static void Receive_thread_function(object handle)
  296. {
  297. if (handle == null)
  298. return;
  299. Communicator comm = (Communicator)handle;
  300. while (!comm.mb_exit)
  301. {
  302. try
  303. {
  304. if (!comm.mb_initialized || comm.m_socket == null)
  305. continue;
  306. byte[] data = comm.m_socket.ReceiveImmediate();
  307. if (data != null && data.Length > 0 && comm.m_receive_queue != null)
  308. {
  309. lock (comm.m_receive_lock)
  310. {
  311. //ByteString tmp = ByteString.CopyFrom(data);
  312. //Base_msg base_msg = Base_msg.Parser.ParseFrom(tmp);
  313. //Console.WriteLine(base_msg.ToString());
  314. //Console.WriteLine("msg received");
  315. comm.m_receive_queue.Enqueue(ByteString.CopyFrom(data));
  316. }
  317. }
  318. }
  319. catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
  320. Thread.Sleep(50);
  321. }
  322. Console.WriteLine("receive thread exit");
  323. }
  324. /// <summary>
  325. /// 发送线程函数
  326. /// </summary>
  327. /// <param name="handle"></param>
  328. private static void Send_thread_function(object handle)
  329. {
  330. if (handle == null)
  331. return;
  332. Communicator comm = (Communicator)handle;
  333. while (!comm.mb_exit)
  334. {
  335. try
  336. {
  337. if (!comm.mb_initialized || comm.m_socket == null)
  338. continue;
  339. lock (comm.m_send_lock)
  340. {
  341. while (comm.m_send_queue.Count > 0)
  342. {
  343. ByteString bs = comm.m_send_queue.Dequeue();
  344. comm.m_socket.Send(bs.ToByteArray());
  345. }
  346. }
  347. //Console.WriteLine("msg sent");
  348. }
  349. catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
  350. Thread.Sleep(50);
  351. }
  352. Console.WriteLine("send thread exit");
  353. }
  354. /// <summary>
  355. /// 解析线程函数
  356. /// </summary>
  357. /// <param name="handle"></param>
  358. private static void Decode_thread_function(object handle)
  359. {
  360. if (handle == null)
  361. return;
  362. Communicator comm = (Communicator)handle;
  363. while (!comm.mb_exit)
  364. {
  365. try
  366. {
  367. DateTime current_time = DateTime.Now;
  368. if (!comm.mb_initialized)
  369. continue;
  370. ByteString msg_str = null;
  371. lock (comm.m_receive_lock)
  372. {
  373. if (comm.m_receive_queue.Count > 0)
  374. {
  375. msg_str = comm.m_receive_queue.Dequeue();
  376. }
  377. }
  378. // 解析
  379. comm.Decode_msg(msg_str);
  380. //Console.WriteLine("msg parsed");
  381. }
  382. catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
  383. Thread.Sleep(50);
  384. }
  385. Console.WriteLine("decode thread exit");
  386. }
  387. /// <summary>
  388. /// 解析string到protobuf消息
  389. /// </summary>
  390. private void Decode_msg(ByteString msg)
  391. {
  392. if (msg == null)
  393. return;
  394. Base_msg base_msg = Base_msg.Parser.ParseFrom(msg);
  395. //Console.WriteLine(base_msg.ToString());
  396. switch(base_msg.BaseInfo.MsgType)
  397. {
  398. case Message_type.EParkspaceAllocationDataMsg:
  399. Parkspace_allocation_data_msg parkspace_status_msg = Parkspace_allocation_data_msg.Parser.ParseFrom(msg);
  400. m_parkspaceStatusCallback?.Invoke(parkspace_status_msg);
  401. break;
  402. case Message_type.EParkspaceForceUpdateResponseMsg:
  403. Console.WriteLine("update response");
  404. Parkspace_force_update_response_msg parkspace_force_update_msg = Parkspace_force_update_response_msg.Parser.ParseFrom(msg);
  405. Console.WriteLine(parkspace_force_update_msg.ToString());
  406. m_parkspaceForceUpdateResponseCallback?.Invoke(parkspace_force_update_msg);
  407. break;
  408. default:
  409. //Console.WriteLine("unrecognized message received");
  410. break;
  411. }
  412. }
  413. }
  414. }