123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using NNanomsg.Protocols;
- using System.Threading;
- using Newtonsoft.Json.Linq;
- using Google.Protobuf;
- using tool;
- namespace chutian_parking_terminal.communication_base
- {
- class Communication_socket_base
- {
- //通信状态
- public enum Communication_statu
- {
- COMMUNICATION_UNKNOW = 0, //通信状态 未知
- COMMUNICATION_READY = 1, //通信状态 正常
- COMMUNICATION_FAULT = 2, //通信状态 错误
- };
- public Communication_socket_base()
- {
- m_communication_statu = Communication_statu.COMMUNICATION_UNKNOW;
- mp_receive_data_thread = null;
- mp_analysis_data_thread = null;
- mp_send_data_thread = null;
- mp_encapsulate_data_thread = null;
- m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
- m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
- m_socket = new BusSocket();
- m_receive_data_list = new List<byte[]>();
- m_send_data_list = new List<byte[]>();
- //接受线程默认循环
- m_receive_data_condition = true;
- mp_receive_data_thread = new Thread(receive_data_thread);
- //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
- m_analysis_data_condition = true;
- mp_analysis_data_thread = new Thread(analysis_data_thread);
- //发送线程默认循环
- m_send_data_condition = true;
- mp_send_data_thread = new Thread(send_data_thread);
- //封装线程默认循环
- m_encapsulate_data_condition = true;
- mp_encapsulate_data_thread = new Thread(encapsulate_data_thread);
- m_event_exit = new EventWaitHandle(false, EventResetMode.ManualReset);
- json_file_operation.Instance.json_file_operation_init("../../settings.json");
- }
- //初始化通信模块--默认从json文件读取
- public virtual void communication_init()
- {
- //绑定
- string bind_string = json_file_operation.Instance.read_json_string("communication_bind");
- communication_bind(bind_string);
- //连接
- JArray connect_string = json_file_operation.Instance.read_json_jarray("communication_connect");
- foreach (var connect in connect_string)
- {
- communication_connect(connect.ToString());
- }
- //启动通信, run thread
- communication_run();
- }
- //绑定
- public virtual void communication_bind(string bind_string)
- {
- m_socket.Bind(bind_string);
- }
- //连接
- public virtual void communication_connect(string connect_string)
- {
- m_socket.Connect(connect_string);
- }
- //启动通信
- public virtual void communication_run()
- {
- m_communication_statu = Communication_statu.COMMUNICATION_READY;
- //启动4个线程。
- mp_receive_data_thread.Start();
- mp_analysis_data_thread.Start();
- mp_send_data_thread.Start();
- mp_encapsulate_data_thread.Start();
- }
- //反初始化
- public virtual void communication_uninit()
- {
- //关闭几个线程
- m_receive_data_condition = false;
- m_analysis_data_condition = false;
- m_send_data_condition = false;
- m_encapsulate_data_condition = false;
- //回收4个线程的资源
- if (mp_receive_data_thread != null )
- {
- mp_receive_data_thread.Abort();
- }
- if (mp_analysis_data_thread != null )
- {
- mp_analysis_data_thread.Abort();
- }
- if (mp_send_data_thread != null )
- {
- mp_send_data_thread.Abort();
- }
- if (mp_encapsulate_data_thread != null )
- {
- mp_encapsulate_data_thread.Abort();
- }
- //清空list
- lock (m_receive_list_lock)
- {
- m_receive_data_list.Clear();
- }
- lock (m_send_list_lock)
- {
- m_send_data_list.Clear();
- }
- m_communication_statu = Communication_statu.COMMUNICATION_UNKNOW;
- }
- //设置解析时间
- public void set_analysis_cycle_time(int analysis_cycle_time)
- {
- m_analysis_cycle_time = analysis_cycle_time;
- }
- //设置封装时间
- public void set_encapsulate_cycle_time(int encapsulate_cycle_time)
- {
- m_encapsulate_cycle_time = encapsulate_cycle_time;
- }
- //mp_receive_data_thread 接受线程执行函数,
- //receive_data_thread 内部线程负责接受消息
- protected void receive_data_thread()
- {
- Console.WriteLine(" Communication_socket_base::receive_data_thread start " );
- //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
- while (m_receive_data_condition && !m_event_exit.WaitOne(1) )
- {
- if (m_receive_data_condition)
- {
- //非阻塞接受消息, 如果接收到消息, data!=null
- byte[] data = m_socket.ReceiveImmediate();
- if (data != null)
- {
- //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
- if (check_msg(data))
- {
- lock (m_receive_list_lock)
- {
- m_receive_data_list.Add(data);
- }
- }
- else
- {
- continue;
- }
- }
- }
- }
- Console.WriteLine(" Communication_socket_base::receive_data_thread end " );
- }
- //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
- //子类必须重载, 增加自己模块的判断逻辑
- public virtual bool check_msg(byte[] message)
- {
- // Message.Base_info base_Info = Message.Base_info.Parser.ParseJson(message);
- Message.Base_info base_Info = Message.Base_msg.Parser.ParseFrom(message).BaseInfo;
- if (base_Info.Receiver == Message.Communicator.EMain)
- {
- return true;
- }
- else
- {
- //无效的消息
- return true;
- }
- }
- //mp_analysis_data_thread 解析线程执行函数,
- //analysis_data_thread 内部线程负责解析消息
- protected void analysis_data_thread()
- {
- Console.WriteLine(" Communication_socket_base::analysis_data_thread start " );
- //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
- while (m_analysis_data_condition && !m_event_exit.WaitOne(1) )
- {
- if (m_analysis_data_condition)
- {
- //处理链表信息
- lock (m_receive_list_lock)
- {
- analysis_receive_list();
- }
- }
- }
- Console.WriteLine(" Communication_socket_base::analysis_data_thread end " );
- }
- //遍历接受链表, 解析消息,
- protected void analysis_receive_list()
- {
- if (m_receive_data_list.Count == 0 )
- {
- return ;
- }
- else
- {
- for(int i=0;i< m_receive_data_list.Count;)
- {
- if (m_receive_data_list[i] == null )
- {
- //删除等于空字符串的节点
- m_receive_data_list.Remove(m_receive_data_list[i]);
- continue;
- }
- else
- {
- //检查消息是否可以被处理
- if (check_executer(m_receive_data_list[i]))
- {
- //处理消息
- execute_msg(m_receive_data_list[i]);
- m_receive_data_list.Remove(m_receive_data_list[i]);
- continue;
- }
- //若当前状态不能处理 则跳过该消息
- else
- {
- }
-
- }
- ++i;
- }
- }
- }
- //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
- public virtual bool check_executer(byte[] message)
- {
- //检查对应模块的状态, 判断是否可以处理这条消息
- return true;
- }
- //处理消息, 需要子类重载
- public virtual void execute_msg(byte[] message)
- {
- //处理消息必须子类重载,本模块只负责通信
- }
- //mp_send_data_thread 发送线程执行函数,
- //send_data_thread 内部线程负责发送消息
- protected void send_data_thread()
- {
- Console.WriteLine(" Communication_socket_base::send_data_thread start " );
- //通信发送线程, 负责巡检m_send_data_list, 并发送消息
- while (m_send_data_condition && !m_event_exit.WaitOne(1))
- {
- if ( m_send_data_condition )
- {
- lock (m_send_list_lock)
- {
- if (m_send_data_list.Count != 0)
- {
- for (int i=0;i< m_send_data_list.Count;)
- {
- if (m_send_data_list[i] != null)
- {
- m_socket.Send(m_send_data_list[i]);
- m_send_data_list.Remove(m_send_data_list[i]);
- continue;
- }
- ++i;
- }
- }
- }
- }
- }
- Console.WriteLine(" Communication_socket_base::send_data_thread end " );
- return;
- }
-
- //mp_encapsulate_data_thread 封装线程执行函数,
- //encapsulate_data_thread 内部线程负责封装消息
- protected void encapsulate_data_thread()
- {
- Console.WriteLine(" Communication_socket_base::encapsulate_data_thread start " );
- //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
- while (m_encapsulate_data_condition && !m_event_exit.WaitOne(m_encapsulate_cycle_time))
- {
- if (m_encapsulate_data_condition)
- {
- encapsulate_send_data();
- }
- }
- Console.WriteLine(" Communication_socket_base::encapsulate_data_thread end ");
- }
- //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
- public virtual void encapsulate_send_data()
- {
- return;
- Message.Base_msg t_base_msg;
- Message.Store_command_request_msg msg;
- t_base_msg.BaseInfo.MsgType = Message.Message_type.EBaseMsg;
- t_base_msg.BaseInfo.TimeoutMs = 5000;
- t_base_msg.BaseInfo.Sender = Message.Communicator.EMain;
- t_base_msg.BaseInfo.Receiver = Message.Communicator.EMain;
- m_send_data_list.Add(t_base_msg.ToByteArray());
- }
- //封装消息, 需要子类重载
- public virtual void encapsulate_msg(byte[] message)
- {
- lock (m_send_list_lock)
- {
- m_send_data_list.Add(message);
- }
- }
- //通用的网络编程接口, 默认使用总线模式, (网状结构)
- protected BusSocket m_socket;
- //数据锁
- //作用:将会锁住代码块的内容,并阻止其他线程进入该代码块,直到该代码块运行完成,释放该锁。
- //注意:定义的锁对象应该是 私有的,静态的,只读的,引用类型的对象,这样可以防止外部改变锁对象
- private static readonly object m_receive_list_lock = new object();
- private static readonly object m_send_list_lock = new object();
- //通信状态
- protected Communication_statu m_communication_statu; //通信状态
- //接受模块,
- private List<byte[]> m_receive_data_list; //接受的list容器
- protected Thread mp_receive_data_thread; //接受的线程
- protected bool m_receive_data_condition; //接受的条件变量
- protected Thread mp_analysis_data_thread; //解析的线程
- protected bool m_analysis_data_condition; //解析的条件变量
- protected int m_analysis_cycle_time; //自动解析的时间周期
-
- //发送模块,
- private List<byte[]> m_send_data_list; //发送的list容器
- protected Thread mp_send_data_thread; //发送的线程
- protected bool m_send_data_condition; //发送的条件变量
- protected Thread mp_encapsulate_data_thread; //封装的线程
- protected bool m_encapsulate_data_condition; //封装的条件变量
- protected int m_encapsulate_cycle_time; //自动封装的时间周期
- protected EventWaitHandle m_event_exit; //防止CPU卡死
- }
- }
|