using NNanomsg; using NNanomsg.Protocols; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Message; namespace Communication { class MsgStamped { public DateTime receive_time; public ByteString msg; public Base_info header; public MsgStamped() { receive_time = DateTime.Now; msg = ByteString.Empty; } public MsgStamped(byte[] bytes) { receive_time = DateTime.Now; msg = ByteString.CopyFrom(bytes); } public MsgStamped(ByteString msg) { receive_time = DateTime.Now; this.msg = msg; } public MsgStamped(ByteString msg, Base_info header) { receive_time = DateTime.Now; this.msg = msg; this.header = header; if(this.header.TimeoutMs <= 0) { this.header.TimeoutMs = 5000; } } } enum CommunicatorStatus { COMMUNICATION_UNKNOW = 0, //通信状态 未知 COMMUNICATION_READY = 1, //通信状态 正常 COMMUNICATION_FAULT = 3, //通信状态 错误 } enum CheckExecuterReturn { MSG_TIMEOUT = 0, // 消息超时 EXECUTER_READY = 1, // 执行器就绪 EXECUTER_BUSY = 2, // 执行器正忙 EXECUTER_FAULT = 3, // 执行器错误 } class Communicator { /// /// 实例退出标记 /// public bool mb_exit; /// /// 初始化标记 /// public bool mb_initialized; /// /// 消息超时时间 /// public int m_timeout_milli = 2000; /// /// 通信类当前状态 /// public CommunicatorStatus m_status; /// /// 单例锁对象 /// protected readonly static object lockObj = new object(); /// /// 单例 /// protected static Communicator instance = null; /// /// 接收解析锁 /// protected object m_receive_lock; /// /// 发送锁 /// protected object m_send_lock; /// /// 发送队列 /// protected Queue m_send_queue; /// /// 接收队列 /// protected Queue m_receive_queue; /// /// 发送线程 /// protected Thread m_thread_send; /// /// 接收线程 /// protected Thread m_thread_receive; /// /// 解析接收string到protobuf消息 /// protected Thread m_thread_decode_receive; /// /// nanomsg 通信句柄 /// protected BusSocket m_socket; /// /// nnxx生成id队列 /// protected Queue nanomsgEndpoints_queue; /// /// 构造函数 /// /// /// protected Communicator() { mb_exit = false; mb_initialized = false; m_receive_lock = new object(); m_send_lock = new object(); m_receive_queue = new Queue(); m_send_queue = new Queue(); nanomsgEndpoints_queue = new Queue(); m_status = CommunicatorStatus.COMMUNICATION_UNKNOW; } /// /// 析构函数 /// ~Communicator() { mb_exit = true; if (m_thread_send != null) { m_thread_send.Join(); } if (m_thread_receive != null) { m_thread_receive.Join(); } if (m_thread_decode_receive != null) { m_thread_decode_receive.Join(); } } /// /// 单例访问 /// public static Communicator GetInstance() { if (instance == null) { lock (lockObj) { if (instance == null) { instance = new Communicator(); } } } return instance; } /// /// 初始化 /// /// public bool Init() { try { if (!mb_initialized) { mb_exit = false; mb_initialized = true; m_socket = new BusSocket(); m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function)); m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function)); m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function)); m_thread_receive.Start(this); m_thread_send.Start(this); m_thread_decode_receive.Start(this); m_status = CommunicatorStatus.COMMUNICATION_READY; return true; } else { return false; } } catch (Exception ex) { Console.WriteLine(ex.StackTrace); return false; } } /// /// 反初始化 /// /// public bool Uninit() { m_status = CommunicatorStatus.COMMUNICATION_UNKNOW; mb_exit = true; if (m_thread_receive != null) m_thread_receive.Join(); if (m_thread_send != null) m_thread_send.Join(); if (m_thread_decode_receive != null) m_thread_decode_receive.Join(); if(m_socket!=null) { while(nanomsgEndpoints_queue.Count>0) { NanomsgEndpoint nnep = nanomsgEndpoints_queue.Dequeue(); m_socket.Shutdown(nnep); } m_socket.Dispose(); } mb_initialized = false; return true; } /// /// 连接 /// /// /// public bool Connect(string server_address) { if (m_socket == null) return false; NanomsgEndpoint end_point = m_socket.Connect(server_address); return true; } /// /// 绑定本地端口监听 /// /// /// public bool Bind(string self_address) { if (m_socket == null) return false; NanomsgEndpoint end_point = m_socket.Bind(self_address); return true; } /// /// 发送消息 /// /// public bool Send_msg(ByteString bs) { lock(m_send_lock) { m_send_queue.Enqueue(new MsgStamped(bs)); } return true; } /// /// 检查消息 /// /// /// public virtual bool CheckMsg(Base_info header) { if (header.HasMsgType && header.HasSender && header.HasTimeoutMs && header.MsgType == Message_type.EBaseMsg && header.Sender == Message.Communicator.EMain) return true; else return false; } /// /// 检查执行器状态 /// /// /// /// public virtual CheckExecuterReturn CheckExecuter(Base_info header, DateTime receiveTime) { if ((DateTime.Now - receiveTime).Milliseconds > header.TimeoutMs) return CheckExecuterReturn.MSG_TIMEOUT; else return CheckExecuterReturn.EXECUTER_READY; } /// /// 执行消息 /// /// /// public virtual bool ExecuteMsg(MsgStamped msgStamped) { return true; } /// /// 接收线程函数 /// /// private static void Receive_thread_function(object handle) { if (handle == null) return; Communicator comm = (Communicator)handle; while (!comm.mb_exit) { try { if (!comm.mb_initialized || comm.m_socket == null) continue; byte[] data = comm.m_socket.ReceiveImmediate(); if (data != null && data.Length > 0 && comm.m_receive_queue != null) { // 解析头 Base_msg base_msg = Base_msg.Parser.ParseFrom(data); //Console.WriteLine(base_msg.ToString()); if (!comm.CheckMsg(base_msg.BaseInfo)) continue; lock (comm.m_receive_lock) { comm.m_receive_queue.Enqueue(new MsgStamped(ByteString.CopyFrom(data), base_msg.BaseInfo)); } } } catch (Exception ex) { Console.WriteLine(ex.StackTrace); } Thread.Sleep(1); } Console.WriteLine("receive thread exit"); } /// /// 发送线程函数 /// /// private static void Send_thread_function(object handle) { if (handle == null) return; Communicator comm = (Communicator)handle; while (!comm.mb_exit) { try { if (!comm.mb_initialized || comm.m_socket == null) continue; lock (comm.m_send_lock) { if(comm.m_send_queue.Count > 0) { MsgStamped msg_stamped = comm.m_send_queue.Dequeue(); comm.m_socket.Send(msg_stamped.msg.ToByteArray()); } } //Console.WriteLine("msg sent"); } catch (Exception ex) { Console.WriteLine(ex.StackTrace); } Thread.Sleep(1); } Console.WriteLine("send thread exit"); } /// /// 解析线程函数 /// /// private static void Decode_thread_function(object handle) { if (handle == null) return; Communicator comm = (Communicator)handle; while (!comm.mb_exit) { try { DateTime current_time = DateTime.Now; if (!comm.mb_initialized) continue; MsgStamped msg_stamped = null; lock (comm.m_receive_lock) { if (comm.m_receive_queue.Count > 0) { msg_stamped = comm.m_receive_queue.Dequeue(); } else { continue; } CheckExecuterReturn ret = comm.CheckExecuter(msg_stamped.header, msg_stamped.receive_time); if (ret == CheckExecuterReturn.MSG_TIMEOUT) { continue; }else if(ret == CheckExecuterReturn.EXECUTER_BUSY || ret == CheckExecuterReturn.EXECUTER_FAULT) { comm.m_receive_queue.Enqueue(msg_stamped); }else if(ret == CheckExecuterReturn.EXECUTER_READY) { comm.ExecuteMsg(msg_stamped); } } // 解析 //comm.Decode_msg(msg_stamped.msg); //Console.WriteLine("msg parsed"); } catch (Exception ex) { Console.WriteLine(ex.StackTrace); } Thread.Sleep(1); } Console.WriteLine("decode thread exit"); } ///// ///// 解析string到protobuf消息 ///// //private void Decode_msg(ByteString msg) //{ // if (msg == null) // return; // Base_msg base_msg = Base_msg.Parser.ParseFrom(msg); // switch(base_msg.BaseInfo.MsgType) // { // case Message_type.EParkspaceAllocationStatusMsg: // Parkspace_allocation_status_msg parkspace_status_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg); // break; // case Message_type.EParkspaceForceUpdateResponseMsg: // Console.WriteLine("update response"); // Parkspace_force_update_response_msg parkspace_force_update_msg = Parkspace_force_update_response_msg.Parser.ParseFrom(msg); // Console.WriteLine(parkspace_force_update_msg.ToString()); // break; // case Message_type.EGroundStatusMsg: // Console.WriteLine("get ground status....."); // Ground_status_msg ground_status_msg = Ground_status_msg.Parser.ParseFrom(msg); // Console.WriteLine(ground_status_msg.ToString()); // break; // default: // Console.WriteLine("unrecognized message received"); // break; // } //} } }