using NNanomsg;
using NNanomsg.Protocols;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Message;
namespace Communication
{
class MsgStamped
{
public DateTime receive_time;
public ByteString msg;
public Base_info header;
public MsgStamped()
{
receive_time = DateTime.Now;
msg = ByteString.Empty;
}
public MsgStamped(byte[] bytes)
{
receive_time = DateTime.Now;
msg = ByteString.CopyFrom(bytes);
}
public MsgStamped(ByteString msg)
{
receive_time = DateTime.Now;
this.msg = msg;
}
public MsgStamped(ByteString msg, Base_info header)
{
receive_time = DateTime.Now;
this.msg = msg;
this.header = header;
if(this.header.TimeoutMs <= 0)
{
this.header.TimeoutMs = 5000;
}
}
}
enum CommunicatorStatus
{
COMMUNICATION_UNKNOW = 0, //通信状态 未知
COMMUNICATION_READY = 1, //通信状态 正常
COMMUNICATION_FAULT = 3, //通信状态 错误
}
enum CheckExecuterReturn
{
MSG_TIMEOUT = 0, // 消息超时
EXECUTER_READY = 1, // 执行器就绪
EXECUTER_BUSY = 2, // 执行器正忙
EXECUTER_FAULT = 3, // 执行器错误
}
class Communicator
{
///
/// 实例退出标记
///
public bool mb_exit;
///
/// 初始化标记
///
public bool mb_initialized;
///
/// 消息超时时间
///
public int m_timeout_milli = 2000;
///
/// 通信类当前状态
///
public CommunicatorStatus m_status;
///
/// 单例锁对象
///
protected readonly static object lockObj = new object();
///
/// 单例
///
protected static Communicator instance = null;
///
/// 接收解析锁
///
protected object m_receive_lock;
///
/// 发送锁
///
protected object m_send_lock;
///
/// 发送队列
///
protected Queue m_send_queue;
///
/// 接收队列
///
protected Queue m_receive_queue;
///
/// 发送线程
///
protected Thread m_thread_send;
///
/// 接收线程
///
protected Thread m_thread_receive;
///
/// 解析接收string到protobuf消息
///
protected Thread m_thread_decode_receive;
///
/// nanomsg 通信句柄
///
protected BusSocket m_socket;
///
/// nnxx生成id队列
///
protected Queue nanomsgEndpoints_queue;
///
/// 构造函数
///
///
///
protected Communicator()
{
mb_exit = false;
mb_initialized = false;
m_receive_lock = new object();
m_send_lock = new object();
m_receive_queue = new Queue();
m_send_queue = new Queue();
nanomsgEndpoints_queue = new Queue();
m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
}
///
/// 析构函数
///
~Communicator()
{
mb_exit = true;
if (m_thread_send != null)
{
m_thread_send.Join();
}
if (m_thread_receive != null)
{
m_thread_receive.Join();
}
if (m_thread_decode_receive != null)
{
m_thread_decode_receive.Join();
}
}
///
/// 单例访问
///
public static Communicator GetInstance()
{
if (instance == null)
{
lock (lockObj)
{
if (instance == null)
{
instance = new Communicator();
}
}
}
return instance;
}
///
/// 初始化
///
///
public bool Init()
{
try
{
if (!mb_initialized)
{
mb_exit = false;
mb_initialized = true;
m_socket = new BusSocket();
m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function));
m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function));
m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function));
m_thread_receive.Start(this);
m_thread_send.Start(this);
m_thread_decode_receive.Start(this);
m_status = CommunicatorStatus.COMMUNICATION_READY;
return true;
}
else
{
return false;
}
}
catch (Exception ex) { Console.WriteLine(ex.StackTrace); return false; }
}
///
/// 反初始化
///
///
public bool Uninit()
{
m_status = CommunicatorStatus.COMMUNICATION_UNKNOW;
mb_exit = true;
if (m_thread_receive != null)
m_thread_receive.Join();
if (m_thread_send != null)
m_thread_send.Join();
if (m_thread_decode_receive != null)
m_thread_decode_receive.Join();
if(m_socket!=null)
{
while(nanomsgEndpoints_queue.Count>0)
{
NanomsgEndpoint nnep = nanomsgEndpoints_queue.Dequeue();
m_socket.Shutdown(nnep);
}
m_socket.Dispose();
}
mb_initialized = false;
return true;
}
///
/// 连接
///
///
///
public bool Connect(string server_address)
{
if (m_socket == null)
return false;
NanomsgEndpoint end_point = m_socket.Connect(server_address);
return true;
}
///
/// 绑定本地端口监听
///
///
///
public bool Bind(string self_address)
{
if (m_socket == null)
return false;
NanomsgEndpoint end_point = m_socket.Bind(self_address);
return true;
}
///
/// 发送消息
///
///
public bool Send_msg(ByteString bs)
{
lock(m_send_lock)
{
m_send_queue.Enqueue(new MsgStamped(bs));
}
return true;
}
///
/// 检查消息
///
///
///
public virtual bool CheckMsg(Base_info header)
{
if (header.HasMsgType && header.HasSender && header.HasTimeoutMs
&& header.MsgType == Message_type.EBaseMsg && header.Sender == Message.Communicator.EMain)
return true;
else
return false;
}
///
/// 检查执行器状态
///
///
///
///
public virtual CheckExecuterReturn CheckExecuter(Base_info header, DateTime receiveTime)
{
if ((DateTime.Now - receiveTime).Milliseconds > header.TimeoutMs)
return CheckExecuterReturn.MSG_TIMEOUT;
else
return CheckExecuterReturn.EXECUTER_READY;
}
///
/// 执行消息
///
///
///
public virtual bool ExecuteMsg(MsgStamped msgStamped)
{
return true;
}
///
/// 接收线程函数
///
///
private static void Receive_thread_function(object handle)
{
if (handle == null)
return;
Communicator comm = (Communicator)handle;
while (!comm.mb_exit)
{
try
{
if (!comm.mb_initialized || comm.m_socket == null)
continue;
byte[] data = comm.m_socket.ReceiveImmediate();
if (data != null && data.Length > 0 && comm.m_receive_queue != null)
{
// 解析头
Base_msg base_msg = Base_msg.Parser.ParseFrom(data);
//Console.WriteLine(base_msg.ToString());
if (!comm.CheckMsg(base_msg.BaseInfo))
continue;
lock (comm.m_receive_lock)
{
comm.m_receive_queue.Enqueue(new MsgStamped(ByteString.CopyFrom(data), base_msg.BaseInfo));
}
}
}
catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
Thread.Sleep(1);
}
Console.WriteLine("receive thread exit");
}
///
/// 发送线程函数
///
///
private static void Send_thread_function(object handle)
{
if (handle == null)
return;
Communicator comm = (Communicator)handle;
while (!comm.mb_exit)
{
try
{
if (!comm.mb_initialized || comm.m_socket == null)
continue;
lock (comm.m_send_lock)
{
if(comm.m_send_queue.Count > 0)
{
MsgStamped msg_stamped = comm.m_send_queue.Dequeue();
comm.m_socket.Send(msg_stamped.msg.ToByteArray());
}
}
//Console.WriteLine("msg sent");
}
catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
Thread.Sleep(1);
}
Console.WriteLine("send thread exit");
}
///
/// 解析线程函数
///
///
private static void Decode_thread_function(object handle)
{
if (handle == null)
return;
Communicator comm = (Communicator)handle;
while (!comm.mb_exit)
{
try
{
DateTime current_time = DateTime.Now;
if (!comm.mb_initialized)
continue;
MsgStamped msg_stamped = null;
lock (comm.m_receive_lock)
{
if (comm.m_receive_queue.Count > 0)
{
msg_stamped = comm.m_receive_queue.Dequeue();
}
else
{
continue;
}
CheckExecuterReturn ret = comm.CheckExecuter(msg_stamped.header, msg_stamped.receive_time);
if (ret == CheckExecuterReturn.MSG_TIMEOUT)
{
continue;
}else if(ret == CheckExecuterReturn.EXECUTER_BUSY || ret == CheckExecuterReturn.EXECUTER_FAULT)
{
comm.m_receive_queue.Enqueue(msg_stamped);
}else if(ret == CheckExecuterReturn.EXECUTER_READY)
{
comm.ExecuteMsg(msg_stamped);
}
}
// 解析
//comm.Decode_msg(msg_stamped.msg);
//Console.WriteLine("msg parsed");
}
catch (Exception ex) { Console.WriteLine(ex.StackTrace); }
Thread.Sleep(1);
}
Console.WriteLine("decode thread exit");
}
/////
///// 解析string到protobuf消息
/////
//private void Decode_msg(ByteString msg)
//{
// if (msg == null)
// return;
// Base_msg base_msg = Base_msg.Parser.ParseFrom(msg);
// switch(base_msg.BaseInfo.MsgType)
// {
// case Message_type.EParkspaceAllocationStatusMsg:
// Parkspace_allocation_status_msg parkspace_status_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg);
// break;
// case Message_type.EParkspaceForceUpdateResponseMsg:
// Console.WriteLine("update response");
// Parkspace_force_update_response_msg parkspace_force_update_msg = Parkspace_force_update_response_msg.Parser.ParseFrom(msg);
// Console.WriteLine(parkspace_force_update_msg.ToString());
// break;
// case Message_type.EGroundStatusMsg:
// Console.WriteLine("get ground status.....");
// Ground_status_msg ground_status_msg = Ground_status_msg.Parser.ParseFrom(msg);
// Console.WriteLine(ground_status_msg.ToString());
// break;
// default:
// Console.WriteLine("unrecognized message received");
// break;
// }
//}
}
}