using NNanomsg;
using NNanomsg.Protocols;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Message;
namespace parkspace_manager
{
class Communicator
{
private readonly static object lockObj = new object();
///
/// 单例
///
private static Communicator instance = null;
///
/// 消息超时时间
///
private static int timeout_milli = 2000;
///
/// 接收解析锁
///
private object m_receive_lock;
///
/// 发送锁
///
private object m_send_lock;
///
/// 车位状态访问锁
///
private object m_parkspace_status_access_lock;
///
/// 车位消息队列
///
private Parkspace_allocation_status_msg m_parkspace_current_status;
///
/// 当前时间
///
private DateTime m_last_parkspace_status_time;
///
/// 车位状态超时
///
private bool mb_parkspace_status_timeout;
///
/// 发送队列
///
private Queue m_send_queue;
///
/// 接收队列
///
private Queue m_receive_queue;
///
/// 发送线程
///
private Thread m_thread_send;
///
/// 接收线程
///
private Thread m_thread_receive;
///
/// 解析接收string到protobuf消息
///
private Thread m_thread_decode_receive;
///
/// nanomsg 通信句柄
///
private BusSocket m_socket;
///
/// 实例退出标记
///
private bool mb_exit;
///
/// 初始化标记
///
private bool mb_initialized;
///
/// 构造函数
///
///
///
private Communicator()
{
mb_exit = false;
mb_initialized = false;
mb_parkspace_status_timeout = false;
m_receive_lock = new object();
m_send_lock = new object();
m_parkspace_status_access_lock = new object();
m_receive_queue = new Queue();
m_send_queue = new Queue();
m_parkspace_current_status = new Parkspace_allocation_status_msg();
}
///
/// 析构函数
///
~Communicator()
{
mb_exit = true;
if (m_thread_send != null)
{
m_thread_send.Abort();
}
if (m_thread_receive != null)
{
m_thread_receive.Abort();
}
if (m_thread_decode_receive != null)
m_thread_decode_receive.Abort();
}
///
/// 单例访问
///
public static Communicator Instance
{
get
{
if (instance == null)
{
lock (lockObj)
{
if (instance == null)
{
instance = new Communicator();
}
}
}
return instance;
}
}
///
/// 初始化
///
///
public bool Init()
{
mb_initialized = true;
m_socket = new BusSocket();
m_thread_receive = new Thread(new ParameterizedThreadStart(Receive_thread_function));
m_thread_send = new Thread(new ParameterizedThreadStart(Send_thread_function));
m_thread_decode_receive = new Thread(new ParameterizedThreadStart(Decode_thread_function));
m_thread_receive.Start(this);
m_thread_send.Start(this);
m_thread_decode_receive.Start(this);
return true;
}
///
/// 反初始化
///
///
public bool Uninit()
{
mb_exit = true;
if(m_thread_receive!=null)
m_thread_receive.Join();
if(m_thread_send!=null)
m_thread_send.Join();
if (m_thread_decode_receive != null)
m_thread_decode_receive.Join();
return true;
}
///
/// 连接
///
///
///
public bool Connect(string server_address)
{
if (m_socket == null)
return false;
NanomsgEndpoint end_point = m_socket.Connect(server_address);
return true;
}
///
/// 绑定本地端口监听
///
///
///
public bool Bind(string self_address)
{
if (m_socket == null)
return false;
NanomsgEndpoint end_point = m_socket.Bind(self_address);
return true;
}
///
/// 获取当前车位状态
///
public bool Get_parkspace_status(ref Parkspace_allocation_status_msg msg)
{
lock (m_parkspace_status_access_lock)
{
msg.MergeFrom(m_parkspace_current_status);
}
if((DateTime.Now-m_last_parkspace_status_time).Milliseconds > timeout_milli)
{
return false;
}
else
{
return true;
}
}
///
/// 发送手动更新车位消息
///
///
public bool Send_msg()
{
}
///
/// 接收线程函数
///
///
private static void Receive_thread_function(object handle)
{
if (handle == null)
return;
Communicator comm = (Communicator)handle;
while(!comm.mb_exit)
{
if (!comm.mb_initialized || comm.m_socket==null)
continue;
byte[] data = comm.m_socket.ReceiveImmediate();
if(data!=null && data.Length>0)
{
lock (comm.m_receive_lock)
{
//ByteString tmp = ByteString.CopyFrom(data);
//Base_msg base_msg = Base_msg.Parser.ParseFrom(tmp);
//Console.WriteLine(base_msg.ToString());
comm.m_receive_queue.Enqueue(ByteString.CopyFrom(data));
}
}
Thread.Sleep(50);
}
Console.WriteLine("receive thread exit");
}
///
/// 发送线程函数
///
///
private static void Send_thread_function(object handle)
{
if (handle == null)
return;
Communicator comm = (Communicator)handle;
while (!comm.mb_exit)
{
if (!comm.mb_initialized || comm.m_socket == null)
continue;
lock(comm.m_send_lock)
{
}
Thread.Sleep(500);
}
Console.WriteLine("send thread exit");
}
///
/// 解析线程函数
///
///
private static void Decode_thread_function(object handle)
{
if (handle == null)
return;
Communicator comm = (Communicator)handle;
while (!comm.mb_exit)
{
DateTime current_time = DateTime.Now;
if (!comm.mb_initialized)
continue;
Parkspace_allocation_status_msg parkspace_status = null;
lock (comm.m_receive_lock)
{
if (comm.m_receive_queue.Count > 0)
{
ByteString msg_str = comm.m_receive_queue.Dequeue();
parkspace_status = comm.Decode_msg(msg_str);
if (parkspace_status != null)
{
comm.m_last_parkspace_status_time = DateTime.Now;
Console.WriteLine(parkspace_status.ToString());
}
}
}
if(parkspace_status != null)
{
lock (comm.m_parkspace_status_access_lock)
{
comm.m_parkspace_current_status = parkspace_status;
//comm.m_parkspace_current_status.MergeFrom(parkspace_status);
}
}
Thread.Sleep(50);
}
Console.WriteLine("decode thread exit");
}
///
/// 解析string到protobuf消息
///
private Parkspace_allocation_status_msg Decode_msg(ByteString msg)
{
Base_msg base_msg = Base_msg.Parser.ParseFrom(msg);
if(base_msg.BaseInfo.MsgType == Message_type.EParkspaceAllocationStatusMsg)
{
Parkspace_allocation_status_msg parsed_msg = Parkspace_allocation_status_msg.Parser.ParseFrom(msg);
return parsed_msg;
}
else
{
return null;
}
}
}
}