|
@@ -0,0 +1,468 @@
|
|
|
+using NNanomsg;
|
|
|
+using NNanomsg.Protocols;
|
|
|
+using System;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Linq;
|
|
|
+using System.Text;
|
|
|
+using System.Threading;
|
|
|
+using System.Threading.Tasks;
|
|
|
+using Google.Protobuf;
|
|
|
+using Message;
|
|
|
+
|
|
|
+namespace Communication
|
|
|
+{
|
|
|
+ class MsgStamped
|
|
|
+ {
|
|
|
+ public DateTime receive_time;
|
|
|
+ public ByteString msg;
|
|
|
+ public Base_info header;
|
|
|
+
|
|
|
+ public MsgStamped()
|
|
|
+ {
|
|
|
+ receive_time = DateTime.Now;
|
|
|
+ msg = ByteString.Empty;
|
|
|
+ }
|
|
|
+
|
|
|
+ public MsgStamped(byte[] bytes)
|
|
|
+ {
|
|
|
+ receive_time = DateTime.Now;
|
|
|
+ msg = ByteString.CopyFrom(bytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ public MsgStamped(ByteString msg)
|
|
|
+ {
|
|
|
+ receive_time = DateTime.Now;
|
|
|
+ this.msg = msg;
|
|
|
+ }
|
|
|
+
|
|
|
+ public MsgStamped(ByteString msg, Base_info header)
|
|
|
+ {
|
|
|
+ receive_time = DateTime.Now;
|
|
|
+ this.msg = msg;
|
|
|
+ this.header = header;
|
|
|
+ if(this.header.TimeoutMs <= 0)
|
|
|
+ {
|
|
|
+ this.header.TimeoutMs = 5000;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ enum CommunicatorStatus
|
|
|
+ {
|
|
|
+ COMMUNICATION_UNKNOW = 0, //通信状态 未知
|
|
|
+ COMMUNICATION_READY = 1, //通信状态 正常
|
|
|
+
|
|
|
+ COMMUNICATION_FAULT = 3, //通信状态 错误
|
|
|
+ }
|
|
|
+
|
|
|
+ enum CheckExecuterReturn
|
|
|
+ {
|
|
|
+ MSG_TIMEOUT = 0, // 消息超时
|
|
|
+ EXECUTER_READY = 1, // 执行器就绪
|
|
|
+ EXECUTER_BUSY = 2, // 执行器正忙
|
|
|
+ EXECUTER_FAULT = 3, // 执行器错误
|
|
|
+ }
|
|
|
+
|
|
|
+ class Communicator
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 实例退出标记
|
|
|
+ /// </summary>
|
|
|
+ public bool mb_exit;
|
|
|
+ /// <summary>
|
|
|
+ /// 初始化标记
|
|
|
+ /// </summary>
|
|
|
+ public bool mb_initialized;
|
|
|
+ /// <summary>
|
|
|
+ /// 消息超时时间
|
|
|
+ /// </summary>
|
|
|
+ public int m_timeout_milli = 2000;
|
|
|
+ /// <summary>
|
|
|
+ /// 通信类当前状态
|
|
|
+ /// </summary>
|
|
|
+ public CommunicatorStatus m_status;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 单例锁对象
|
|
|
+ /// </summary>
|
|
|
+ protected readonly static object lockObj = new object();
|
|
|
+ /// <summary>
|
|
|
+ /// 单例
|
|
|
+ /// </summary>
|
|
|
+ protected static Communicator instance = null;
|
|
|
+ /// <summary>
|
|
|
+ /// 接收解析锁
|
|
|
+ /// </summary>
|
|
|
+ protected object m_receive_lock;
|
|
|
+ /// <summary>
|
|
|
+ /// 发送锁
|
|
|
+ /// </summary>
|
|
|
+ protected object m_send_lock;
|
|
|
+ /// <summary>
|
|
|
+ /// 发送队列
|
|
|
+ /// </summary>
|
|
|
+ protected Queue<MsgStamped> m_send_queue;
|
|
|
+ /// <summary>
|
|
|
+ /// 接收队列
|
|
|
+ /// </summary>
|
|
|
+ protected Queue<MsgStamped> m_receive_queue;
|
|
|
+ /// <summary>
|
|
|
+ /// 发送线程
|
|
|
+ /// </summary>
|
|
|
+ protected Thread m_thread_send;
|
|
|
+ /// <summary>
|
|
|
+ /// 接收线程
|
|
|
+ /// </summary>
|
|
|
+ protected Thread m_thread_receive;
|
|
|
+ /// <summary>
|
|
|
+ /// 解析接收string到protobuf消息
|
|
|
+ /// </summary>
|
|
|
+ protected Thread m_thread_decode_receive;
|
|
|
+ /// <summary>
|
|
|
+ /// nanomsg 通信句柄
|
|
|
+ /// </summary>
|
|
|
+ protected BusSocket m_socket;
|
|
|
+ /// <summary>
|
|
|
+ /// nnxx生成id队列
|
|
|
+ /// </summary>
|
|
|
+ protected Queue<NanomsgEndpoint> nanomsgEndpoints_queue;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 构造函数
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="server_ip"></param>
|
|
|
+ /// <param name="server_port"></param>
|
|
|
+ protected Communicator()
|
|
|
+ {
|
|
|
+ mb_exit = false;
|
|
|
+ mb_initialized = false;
|
|
|
+ m_receive_lock = new object();
|
|
|
+ m_send_lock = new object();
|
|
|
+ m_receive_queue = new Queue<MsgStamped>();
|
|
|
+ m_send_queue = new Queue<MsgStamped>();
|
|
|
+ nanomsgEndpoints_queue = new Queue<NanomsgEndpoint>();
|
|
|
+ m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 析构函数
|
|
|
+ /// </summary>
|
|
|
+ ~Communicator()
|
|
|
+ {
|
|
|
+ mb_exit = true;
|
|
|
+ if (m_thread_send != null)
|
|
|
+ {
|
|
|
+ m_thread_send.Join();
|
|
|
+ }
|
|
|
+ if (m_thread_receive != null)
|
|
|
+ {
|
|
|
+ m_thread_receive.Join();
|
|
|
+ }
|
|
|
+ if (m_thread_decode_receive != null)
|
|
|
+ {
|
|
|
+ m_thread_decode_receive.Join();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 单例访问
|
|
|
+ /// </summary>
|
|
|
+ public static Communicator GetInstance()
|
|
|
+ {
|
|
|
+ if (instance == null)
|
|
|
+ {
|
|
|
+ lock (lockObj)
|
|
|
+ {
|
|
|
+ if (instance == null)
|
|
|
+ {
|
|
|
+ instance = new Communicator();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return instance;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 初始化
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public bool Init()
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!mb_initialized)
|
|
|
+ {
|
|
|
+ mb_exit = false;
|
|
|
+ mb_initialized = true;
|
|
|
+ m_socket = new BusSocket();
|
|
|
+ m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function));
|
|
|
+ m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function));
|
|
|
+ m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function));
|
|
|
+ m_thread_receive.Start(this);
|
|
|
+ m_thread_send.Start(this);
|
|
|
+ m_thread_decode_receive.Start(this);
|
|
|
+ m_status = CommunicatorStatus.COMMUNICATION_READY;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex) { Console.WriteLine(ex.StackTrace); return false; }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 反初始化
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public bool Uninit()
|
|
|
+ {
|
|
|
+ m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
|
|
|
+ mb_exit = true;
|
|
|
+ if (m_thread_receive != null)
|
|
|
+ m_thread_receive.Join();
|
|
|
+ if (m_thread_send != null)
|
|
|
+ m_thread_send.Join();
|
|
|
+ if (m_thread_decode_receive != null)
|
|
|
+ m_thread_decode_receive.Join();
|
|
|
+ if(m_socket!=null)
|
|
|
+ {
|
|
|
+ while(nanomsgEndpoints_queue.Count>0)
|
|
|
+ {
|
|
|
+ NanomsgEndpoint nnep = nanomsgEndpoints_queue.Dequeue();
|
|
|
+ m_socket.Shutdown(nnep);
|
|
|
+ }
|
|
|
+ m_socket.Dispose();
|
|
|
+ }
|
|
|
+ mb_initialized = false;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 连接
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="server_address"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ public bool Connect(string server_address)
|
|
|
+ {
|
|
|
+ if (m_socket == null)
|
|
|
+ return false;
|
|
|
+ NanomsgEndpoint end_point = m_socket.Connect(server_address);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 绑定本地端口监听
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="self_address"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ public bool Bind(string self_address)
|
|
|
+ {
|
|
|
+ if (m_socket == null)
|
|
|
+ return false;
|
|
|
+ NanomsgEndpoint end_point = m_socket.Bind(self_address);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 发送消息
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public bool Send_msg(ByteString bs)
|
|
|
+ {
|
|
|
+ lock(m_send_lock)
|
|
|
+ {
|
|
|
+ m_send_queue.Enqueue(new MsgStamped(bs));
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 检查消息
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="header"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ public virtual bool CheckMsg(Base_info header)
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ /*if (header.HasMsgType && header.HasSender && header.HasTimeoutMs
|
|
|
+ && header.MsgType == Message_type.EBaseMsg && header.Sender == Message.Communicator.EMain)
|
|
|
+ return true;
|
|
|
+ else
|
|
|
+ return false;*/
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 检查执行器状态
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="header"></param>
|
|
|
+ /// <param name="receiveTime"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ public virtual CheckExecuterReturn CheckExecuter(Base_info header, DateTime receiveTime)
|
|
|
+ {
|
|
|
+ if ((DateTime.Now - receiveTime).Milliseconds > header.TimeoutMs)
|
|
|
+ return CheckExecuterReturn.MSG_TIMEOUT;
|
|
|
+ else
|
|
|
+ return CheckExecuterReturn.EXECUTER_READY;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 执行消息
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="msgStamped"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ public virtual bool ExecuteMsg(MsgStamped msgStamped)
|
|
|
+ {
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 接收线程函数
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="handle"></param>
|
|
|
+ private static void Receive_thread_function(object handle)
|
|
|
+ {
|
|
|
+ if (handle == null)
|
|
|
+ return;
|
|
|
+ Communicator comm = (Communicator)handle;
|
|
|
+ while (!comm.mb_exit)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!comm.mb_initialized || comm.m_socket == null)
|
|
|
+ continue;
|
|
|
+ byte[] data = comm.m_socket.ReceiveImmediate();
|
|
|
+ if (data != null && data.Length > 0 && comm.m_receive_queue != null)
|
|
|
+ {
|
|
|
+ // 解析头
|
|
|
+ Base_msg base_msg = Base_msg.Parser.ParseFrom(data);
|
|
|
+ //Console.WriteLine(base_msg.ToString());
|
|
|
+ if (!comm.CheckMsg(base_msg.BaseInfo))
|
|
|
+ continue;
|
|
|
+ lock (comm.m_receive_lock)
|
|
|
+ {
|
|
|
+ comm.m_receive_queue.Enqueue(new MsgStamped(ByteString.CopyFrom(data), base_msg.BaseInfo));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
|
|
|
+ Thread.Sleep(1);
|
|
|
+ }
|
|
|
+ Console.WriteLine("receive thread exit");
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 发送线程函数
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="handle"></param>
|
|
|
+ private static void Send_thread_function(object handle)
|
|
|
+ {
|
|
|
+ if (handle == null)
|
|
|
+ return;
|
|
|
+ Communicator comm = (Communicator)handle;
|
|
|
+ while (!comm.mb_exit)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!comm.mb_initialized || comm.m_socket == null)
|
|
|
+ continue;
|
|
|
+ lock (comm.m_send_lock)
|
|
|
+ {
|
|
|
+ if(comm.m_send_queue.Count > 0)
|
|
|
+ {
|
|
|
+ MsgStamped msg_stamped = comm.m_send_queue.Dequeue();
|
|
|
+ comm.m_socket.Send(msg_stamped.msg.ToByteArray());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //Console.WriteLine("msg sent");
|
|
|
+ }
|
|
|
+ catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
|
|
|
+ Thread.Sleep(1);
|
|
|
+ }
|
|
|
+ Console.WriteLine("send thread exit");
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 解析线程函数
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="handle"></param>
|
|
|
+ private static void Decode_thread_function(object handle)
|
|
|
+ {
|
|
|
+ if (handle == null)
|
|
|
+ return;
|
|
|
+ Communicator comm = (Communicator)handle;
|
|
|
+ while (!comm.mb_exit)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ DateTime current_time = DateTime.Now;
|
|
|
+
|
|
|
+ if (!comm.mb_initialized)
|
|
|
+ continue;
|
|
|
+ MsgStamped msg_stamped = null;
|
|
|
+ lock (comm.m_receive_lock)
|
|
|
+ {
|
|
|
+ if (comm.m_receive_queue.Count > 0)
|
|
|
+ {
|
|
|
+ msg_stamped = comm.m_receive_queue.Dequeue();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ CheckExecuterReturn ret = comm.CheckExecuter(msg_stamped.header, msg_stamped.receive_time);
|
|
|
+ if (ret == CheckExecuterReturn.MSG_TIMEOUT)
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }else if(ret == CheckExecuterReturn.EXECUTER_BUSY || ret == CheckExecuterReturn.EXECUTER_FAULT)
|
|
|
+ {
|
|
|
+ comm.m_receive_queue.Enqueue(msg_stamped);
|
|
|
+ }else if(ret == CheckExecuterReturn.EXECUTER_READY)
|
|
|
+ {
|
|
|
+ comm.ExecuteMsg(msg_stamped);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 解析
|
|
|
+ //comm.Decode_msg(msg_stamped.msg);
|
|
|
+ //Console.WriteLine("msg parsed");
|
|
|
+ }
|
|
|
+ catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
|
|
|
+ Thread.Sleep(1);
|
|
|
+ }
|
|
|
+ Console.WriteLine("decode thread exit");
|
|
|
+ }
|
|
|
+
|
|
|
+ ///// <summary>
|
|
|
+ ///// 解析string到protobuf消息
|
|
|
+ ///// </summary>
|
|
|
+ //private void Decode_msg(ByteString msg)
|
|
|
+ //{
|
|
|
+ // if (msg == null)
|
|
|
+ // return;
|
|
|
+ // Base_msg base_msg = Base_msg.Parser.ParseFrom(msg);
|
|
|
+ // switch(base_msg.BaseInfo.MsgType)
|
|
|
+ // {
|
|
|
+ // case Message_type.EParkspaceAllocationStatusMsg:
|
|
|
+ // Parkspace_allocation_status_msg parkspace_status_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg);
|
|
|
+ // break;
|
|
|
+ // case Message_type.EParkspaceForceUpdateResponseMsg:
|
|
|
+ // Console.WriteLine("update response");
|
|
|
+ // Parkspace_force_update_response_msg parkspace_force_update_msg = Parkspace_force_update_response_msg.Parser.ParseFrom(msg);
|
|
|
+ // Console.WriteLine(parkspace_force_update_msg.ToString());
|
|
|
+ // break;
|
|
|
+ // case Message_type.EGroundStatusMsg:
|
|
|
+ // Console.WriteLine("get ground status.....");
|
|
|
+ // Ground_status_msg ground_status_msg = Ground_status_msg.Parser.ParseFrom(msg);
|
|
|
+ // Console.WriteLine(ground_status_msg.ToString());
|
|
|
+ // break;
|
|
|
+ // default:
|
|
|
+ // Console.WriteLine("unrecognized message received");
|
|
|
+ // break;
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+ }
|
|
|
+}
|