123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import time
- import grpc
- import def_pb2 as pb
- import def_pb2_grpc as mrpc
- import threading
- from concurrent.futures import ThreadPoolExecutor
- def singleton(cls):
- _instance = {}
- def inner():
- if cls not in _instance:
- _instance[cls] = cls()
- return _instance[cls]
- return inner
- @singleton
- class GrpcStream(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
- self.threadpool=ThreadPoolExecutor(5)
- self.exit_=False
- self.features_=None
- self.clouds_=None
- self.cloudCallBack=None
- self.ErrorCallback=None
- def SetDataCallBack(self,CloudsCallBack,ErrorCallback):
- self.cloudCallBack=CloudsCallBack
- self.ErrorCallback=ErrorCallback
- def OpenDataStream(self,ip,port):
- connectstr='%s:%d'%(ip,port)
- channel=grpc.insecure_channel(connectstr)
- stub = mrpc.StreamServerStub(channel)
- cmd=pb.RequestCmd()
- self.clouds_ = stub.OpenStream(cmd)
- def CloseDataStream(self,ip,port):
- connectstr='%s:%d'%(ip,port)
- channel=grpc.insecure_channel(connectstr)
- stub = mrpc.StreamServerStub(channel)
- cmd=pb.RequestCmd()
- print(" close stream")
- stub.CloseStream(cmd)
- print(" close stream over")
- def loopDataStream(self):
- while self.exit_ == False:
- time.sleep(0.001)
- if self.clouds_ is not None:
- try:
- for clouds in self.clouds_:
- if self.cloudCallBack is not None:
- self.cloudCallBack(clouds)
- except Exception as e:
- if self.ErrorCallback is not None:
- self.ErrorCallback(str(e))
- self.clouds_=None
- def run(self):
- self.loopDataStream()
- print(" close ")
- def close(self):
- self.exit_=True
- self.threadpool.shutdown()
- self.join()
|