using NNanomsg; using NNanomsg.Protocols; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Message; namespace parkspace_manager { class Communicator { private readonly static object lockObj = new object(); /// /// 单例 /// private static Communicator instance = null; /// /// 消息超时时间 /// private static int timeout_milli = 2000; /// /// 接收解析锁 /// private object m_receive_lock; /// /// 发送锁 /// private object m_send_lock; /// /// 车位状态访问锁 /// private object m_parkspace_status_access_lock; /// /// 车位消息队列 /// private Parkspace_allocation_status_msg m_parkspace_current_status; /// /// 当前时间 /// private DateTime m_last_parkspace_status_time; /// /// 车位状态超时 /// private bool mb_parkspace_status_timeout; /// /// 发送队列 /// private Queue m_send_queue; /// /// 接收队列 /// private Queue m_receive_queue; /// /// 发送线程 /// private Thread m_thread_send; /// /// 接收线程 /// private Thread m_thread_receive; /// /// 解析接收string到protobuf消息 /// private Thread m_thread_decode_receive; /// /// nanomsg 通信句柄 /// private BusSocket m_socket; /// /// 实例退出标记 /// private bool mb_exit; /// /// 初始化标记 /// private bool mb_initialized; /// /// 构造函数 /// /// /// private Communicator() { mb_exit = false; mb_initialized = false; mb_parkspace_status_timeout = false; m_receive_lock = new object(); m_send_lock = new object(); m_parkspace_status_access_lock = new object(); m_receive_queue = new Queue(); m_send_queue = new Queue(); m_parkspace_current_status = new Parkspace_allocation_status_msg(); } /// /// 析构函数 /// ~Communicator() { mb_exit = true; if (m_thread_send != null) { m_thread_send.Abort(); } if (m_thread_receive != null) { m_thread_receive.Abort(); } if (m_thread_decode_receive != null) m_thread_decode_receive.Abort(); } /// /// 单例访问 /// public static Communicator Instance { get { if (instance == null) { lock (lockObj) { if (instance == null) { instance = new Communicator(); } } } return instance; } } /// /// 初始化 /// /// public bool Init() { mb_initialized = true; m_socket = new BusSocket(); m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function)); m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function)); m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function)); m_thread_receive.Start(this); m_thread_send.Start(this); m_thread_decode_receive.Start(this); return true; } /// /// 反初始化 /// /// public bool Uninit() { mb_exit = true; if(m_thread_receive!=null) m_thread_receive.Join(); if(m_thread_send!=null) m_thread_send.Join(); if (m_thread_decode_receive != null) m_thread_decode_receive.Join(); return true; } /// /// 连接 /// /// /// public bool Connect(string server_address) { if (m_socket == null) return false; NanomsgEndpoint end_point = m_socket.Connect(server_address); return true; } /// /// 绑定本地端口监听 /// /// /// public bool Bind(string self_address) { if (m_socket == null) return false; NanomsgEndpoint end_point = m_socket.Bind(self_address); return true; } /// /// 获取当前车位状态 /// public bool Get_parkspace_status(ref Parkspace_allocation_status_msg msg) { lock (m_parkspace_status_access_lock) { msg.MergeFrom(m_parkspace_current_status); } if((DateTime.Now-m_last_parkspace_status_time).Milliseconds > timeout_milli) { return false; } else { return true; } } /// /// 发送手动更新车位消息 /// /// public bool Send_msg() { } /// /// 接收线程函数 /// /// private static void Receive_thread_function(object handle) { if (handle == null) return; Communicator comm = (Communicator)handle; while(!comm.mb_exit) { if (!comm.mb_initialized || comm.m_socket==null) continue; byte[] data = comm.m_socket.ReceiveImmediate(); if(data!=null && data.Length>0) { lock (comm.m_receive_lock) { //ByteString tmp = ByteString.CopyFrom(data); //Base_msg base_msg = Base_msg.Parser.ParseFrom(tmp); //Console.WriteLine(base_msg.ToString()); comm.m_receive_queue.Enqueue(ByteString.CopyFrom(data)); } } Thread.Sleep(50); } Console.WriteLine("receive thread exit"); } /// /// 发送线程函数 /// /// private static void Send_thread_function(object handle) { if (handle == null) return; Communicator comm = (Communicator)handle; while (!comm.mb_exit) { if (!comm.mb_initialized || comm.m_socket == null) continue; lock(comm.m_send_lock) { } Thread.Sleep(500); } Console.WriteLine("send thread exit"); } /// /// 解析线程函数 /// /// private static void Decode_thread_function(object handle) { if (handle == null) return; Communicator comm = (Communicator)handle; while (!comm.mb_exit) { DateTime current_time = DateTime.Now; if (!comm.mb_initialized) continue; Parkspace_allocation_status_msg parkspace_status = null; lock (comm.m_receive_lock) { if (comm.m_receive_queue.Count > 0) { ByteString msg_str = comm.m_receive_queue.Dequeue(); parkspace_status = comm.Decode_msg(msg_str); if (parkspace_status != null) { comm.m_last_parkspace_status_time = DateTime.Now; Console.WriteLine(parkspace_status.ToString()); } } } if(parkspace_status != null) { lock (comm.m_parkspace_status_access_lock) { comm.m_parkspace_current_status = parkspace_status; //comm.m_parkspace_current_status.MergeFrom(parkspace_status); } } Thread.Sleep(50); } Console.WriteLine("decode thread exit"); } /// /// 解析string到protobuf消息 /// private Parkspace_allocation_status_msg Decode_msg(ByteString msg) { Base_msg base_msg = Base_msg.Parser.ParseFrom(msg); if(base_msg.BaseInfo.MsgType == Message_type.EParkspaceAllocationStatusMsg) { Parkspace_allocation_status_msg parsed_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg); return parsed_msg; } else { return null; } } } }