123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- using NNanomsg;
- using NNanomsg.Protocols;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using Google.Protobuf;
- using Message;
- namespace Communication
- {
- class MsgStamped
- {
- public DateTime receive_time;
- public ByteString msg;
- public Base_info header;
- public MsgStamped()
- {
- receive_time = DateTime.Now;
- msg = ByteString.Empty;
- }
- public MsgStamped(byte[] bytes)
- {
- receive_time = DateTime.Now;
- msg = ByteString.CopyFrom(bytes);
- }
- public MsgStamped(ByteString msg)
- {
- receive_time = DateTime.Now;
- this.msg = msg;
- }
- public MsgStamped(ByteString msg, Base_info header)
- {
- receive_time = DateTime.Now;
- this.msg = msg;
- this.header = header;
- if(this.header.TimeoutMs <= 0)
- {
- this.header.TimeoutMs = 5000;
- }
- }
- }
- enum CommunicatorStatus
- {
- COMMUNICATION_UNKNOW = 0, //通信状态 未知
- COMMUNICATION_READY = 1, //通信状态 正常
- COMMUNICATION_FAULT = 3, //通信状态 错误
- }
- enum CheckExecuterReturn
- {
- MSG_TIMEOUT = 0, // 消息超时
- EXECUTER_READY = 1, // 执行器就绪
- EXECUTER_BUSY = 2, // 执行器正忙
- EXECUTER_FAULT = 3, // 执行器错误
- }
- class Communicator
- {
- /// <summary>
- /// 实例退出标记
- /// </summary>
- public bool mb_exit;
- /// <summary>
- /// 初始化标记
- /// </summary>
- public bool mb_initialized;
- /// <summary>
- /// 消息超时时间
- /// </summary>
- public int m_timeout_milli = 2000;
- /// <summary>
- /// 通信类当前状态
- /// </summary>
- public CommunicatorStatus m_status;
- /// <summary>
- /// 单例锁对象
- /// </summary>
- protected readonly static object lockObj = new object();
- /// <summary>
- /// 单例
- /// </summary>
- protected static Communicator instance = null;
- /// <summary>
- /// 接收解析锁
- /// </summary>
- protected object m_receive_lock;
- /// <summary>
- /// 发送锁
- /// </summary>
- protected object m_send_lock;
- /// <summary>
- /// 发送队列
- /// </summary>
- protected Queue<MsgStamped> m_send_queue;
- /// <summary>
- /// 接收队列
- /// </summary>
- protected Queue<MsgStamped> m_receive_queue;
- /// <summary>
- /// 发送线程
- /// </summary>
- protected Thread m_thread_send;
- /// <summary>
- /// 接收线程
- /// </summary>
- protected Thread m_thread_receive;
- /// <summary>
- /// 解析接收string到protobuf消息
- /// </summary>
- protected Thread m_thread_decode_receive;
- /// <summary>
- /// nanomsg 通信句柄
- /// </summary>
- protected BusSocket m_socket;
- /// <summary>
- /// nnxx生成id队列
- /// </summary>
- protected Queue<NanomsgEndpoint> nanomsgEndpoints_queue;
- /// <summary>
- /// 构造函数
- /// </summary>
- /// <param name="server_ip"></param>
- /// <param name="server_port"></param>
- protected Communicator()
- {
- mb_exit = false;
- mb_initialized = false;
- m_receive_lock = new object();
- m_send_lock = new object();
- m_receive_queue = new Queue<MsgStamped>();
- m_send_queue = new Queue<MsgStamped>();
- nanomsgEndpoints_queue = new Queue<NanomsgEndpoint>();
- m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
- }
- /// <summary>
- /// 析构函数
- /// </summary>
- ~Communicator()
- {
- mb_exit = true;
- if (m_thread_send != null)
- {
- m_thread_send.Join();
- }
- if (m_thread_receive != null)
- {
- m_thread_receive.Join();
- }
- if (m_thread_decode_receive != null)
- {
- m_thread_decode_receive.Join();
- }
- }
- /// <summary>
- /// 单例访问
- /// </summary>
- public static Communicator GetInstance()
- {
- if (instance == null)
- {
- lock (lockObj)
- {
- if (instance == null)
- {
- instance = new Communicator();
- }
- }
- }
- return instance;
- }
- /// <summary>
- /// 初始化
- /// </summary>
- /// <returns></returns>
- public bool Init()
- {
- try
- {
- if (!mb_initialized)
- {
- mb_exit = false;
- mb_initialized = true;
- m_socket = new BusSocket();
- m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function));
- m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function));
- m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function));
- m_thread_receive.Start(this);
- m_thread_send.Start(this);
- m_thread_decode_receive.Start(this);
- m_status = CommunicatorStatus.COMMUNICATION_READY;
- return true;
- }
- else
- {
- return false;
- }
- }
- catch (Exception ex) { Console.WriteLine(ex.StackTrace); return false; }
- }
- /// <summary>
- /// 反初始化
- /// </summary>
- /// <returns></returns>
- public bool Uninit()
- {
- m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
- mb_exit = true;
- if (m_thread_receive != null)
- m_thread_receive.Join();
- if (m_thread_send != null)
- m_thread_send.Join();
- if (m_thread_decode_receive != null)
- m_thread_decode_receive.Join();
- if(m_socket!=null)
- {
- while(nanomsgEndpoints_queue.Count>0)
- {
- NanomsgEndpoint nnep = nanomsgEndpoints_queue.Dequeue();
- m_socket.Shutdown(nnep);
- }
- m_socket.Dispose();
- }
- mb_initialized = false;
- return true;
- }
- /// <summary>
- /// 连接
- /// </summary>
- /// <param name="server_address"></param>
- /// <returns></returns>
- public bool Connect(string server_address)
- {
- if (m_socket == null)
- return false;
- NanomsgEndpoint end_point = m_socket.Connect(server_address);
- return true;
- }
- /// <summary>
- /// 绑定本地端口监听
- /// </summary>
- /// <param name="self_address"></param>
- /// <returns></returns>
- public bool Bind(string self_address)
- {
- if (m_socket == null)
- return false;
- NanomsgEndpoint end_point = m_socket.Bind(self_address);
- return true;
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- /// <returns></returns>
- public bool Send_msg(ByteString bs)
- {
- lock(m_send_lock)
- {
- m_send_queue.Enqueue(new MsgStamped(bs));
- }
- return true;
- }
- /// <summary>
- /// 检查消息
- /// </summary>
- /// <param name="header"></param>
- /// <returns></returns>
- public virtual bool CheckMsg(Base_info header)
- {
- if (header.HasMsgType && header.HasSender && header.HasTimeoutMs
- && header.MsgType == Message_type.EBaseMsg && header.Sender == Message.Communicator.EMain)
- return true;
- else
- return false;
- }
- /// <summary>
- /// 检查执行器状态
- /// </summary>
- /// <param name="header"></param>
- /// <param name="receiveTime"></param>
- /// <returns></returns>
- public virtual CheckExecuterReturn CheckExecuter(Base_info header, DateTime receiveTime)
- {
- if ((DateTime.Now - receiveTime).Milliseconds > header.TimeoutMs)
- return CheckExecuterReturn.MSG_TIMEOUT;
- else
- return CheckExecuterReturn.EXECUTER_READY;
- }
- /// <summary>
- /// 执行消息
- /// </summary>
- /// <param name="msgStamped"></param>
- /// <returns></returns>
- public virtual bool ExecuteMsg(MsgStamped msgStamped)
- {
- return true;
- }
-
- /// <summary>
- /// 接收线程函数
- /// </summary>
- /// <param name="handle"></param>
- private static void Receive_thread_function(object handle)
- {
- if (handle == null)
- return;
- Communicator comm = (Communicator)handle;
- while (!comm.mb_exit)
- {
- try
- {
- if (!comm.mb_initialized || comm.m_socket == null)
- continue;
- byte[] data = comm.m_socket.ReceiveImmediate();
- if (data != null && data.Length > 0 && comm.m_receive_queue != null)
- {
- // 解析头
- Base_msg base_msg = Base_msg.Parser.ParseFrom(data);
- //Console.WriteLine(base_msg.ToString());
- if (!comm.CheckMsg(base_msg.BaseInfo))
- continue;
- lock (comm.m_receive_lock)
- {
- comm.m_receive_queue.Enqueue(new MsgStamped(ByteString.CopyFrom(data), base_msg.BaseInfo));
- }
- }
- }
- catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
- Thread.Sleep(1);
- }
- Console.WriteLine("receive thread exit");
- }
- /// <summary>
- /// 发送线程函数
- /// </summary>
- /// <param name="handle"></param>
- private static void Send_thread_function(object handle)
- {
- if (handle == null)
- return;
- Communicator comm = (Communicator)handle;
- while (!comm.mb_exit)
- {
- try
- {
- if (!comm.mb_initialized || comm.m_socket == null)
- continue;
- lock (comm.m_send_lock)
- {
- if(comm.m_send_queue.Count > 0)
- {
- MsgStamped msg_stamped = comm.m_send_queue.Dequeue();
- comm.m_socket.Send(msg_stamped.msg.ToByteArray());
- }
- }
- //Console.WriteLine("msg sent");
- }
- catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
- Thread.Sleep(1);
- }
- Console.WriteLine("send thread exit");
- }
- /// <summary>
- /// 解析线程函数
- /// </summary>
- /// <param name="handle"></param>
- private static void Decode_thread_function(object handle)
- {
- if (handle == null)
- return;
- Communicator comm = (Communicator)handle;
- while (!comm.mb_exit)
- {
- try
- {
- DateTime current_time = DateTime.Now;
- if (!comm.mb_initialized)
- continue;
- MsgStamped msg_stamped = null;
- lock (comm.m_receive_lock)
- {
- if (comm.m_receive_queue.Count > 0)
- {
- msg_stamped = comm.m_receive_queue.Dequeue();
- }
- else
- {
- continue;
- }
- CheckExecuterReturn ret = comm.CheckExecuter(msg_stamped.header, msg_stamped.receive_time);
- if (ret == CheckExecuterReturn.MSG_TIMEOUT)
- {
- continue;
- }else if(ret == CheckExecuterReturn.EXECUTER_BUSY || ret == CheckExecuterReturn.EXECUTER_FAULT)
- {
- comm.m_receive_queue.Enqueue(msg_stamped);
- }else if(ret == CheckExecuterReturn.EXECUTER_READY)
- {
- comm.ExecuteMsg(msg_stamped);
- }
- }
- // 解析
- //comm.Decode_msg(msg_stamped.msg);
- //Console.WriteLine("msg parsed");
- }
- catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
- Thread.Sleep(1);
- }
- Console.WriteLine("decode thread exit");
- }
- ///// <summary>
- ///// 解析string到protobuf消息
- ///// </summary>
- //private void Decode_msg(ByteString msg)
- //{
- // if (msg == null)
- // return;
- // Base_msg base_msg = Base_msg.Parser.ParseFrom(msg);
- // switch(base_msg.BaseInfo.MsgType)
- // {
- // case Message_type.EParkspaceAllocationStatusMsg:
- // Parkspace_allocation_status_msg parkspace_status_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg);
- // break;
- // case Message_type.EParkspaceForceUpdateResponseMsg:
- // Console.WriteLine("update response");
- // Parkspace_force_update_response_msg parkspace_force_update_msg = Parkspace_force_update_response_msg.Parser.ParseFrom(msg);
- // Console.WriteLine(parkspace_force_update_msg.ToString());
- // break;
- // case Message_type.EGroundStatusMsg:
- // Console.WriteLine("get ground status.....");
- // Ground_status_msg ground_status_msg = Ground_status_msg.Parser.ParseFrom(msg);
- // Console.WriteLine(ground_status_msg.ToString());
- // break;
- // default:
- // Console.WriteLine("unrecognized message received");
- // break;
- // }
- //}
- }
- }
|