1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import time
- import grpc
- import def_pb2 as pb
- import def_pb2_grpc as mrpc
- import threading
- from concurrent.futures import ThreadPoolExecutor
- def singleton(cls):
- _instance = {}
- def inner():
- if cls not in _instance:
- _instance[cls] = cls()
- return _instance[cls]
- return inner
- @singleton
- class GrpcStream(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
- self.threadpool=ThreadPoolExecutor(5)
- self.exit_=False
- self.features_=None
- self.clouds_=None
- self.cloudCallBack=None
- self.ErrorCallback=None
- def SetDataCallBack(self,CloudsCallBack,ErrorCallback):
- self.cloudCallBack=CloudsCallBack
- self.ErrorCallback=ErrorCallback
- def OpenDataStream(self,ip,port):
- connectstr='%s:%d'%(ip,port)
- channel=grpc.insecure_channel(connectstr)
- stub = mrpc.StreamServerStub(channel)
- cmd=pb.RequestCmd()
- self.clouds_ = stub.OpenStream(cmd)
- def CloseDataStream(self,ip,port):
- connectstr='%s:%d'%(ip,port)
- channel=grpc.insecure_channel(connectstr)
- stub = mrpc.StreamServerStub(channel)
- cmd=pb.RequestCmd()
- print(" close stream")
- stub.CloseStream(cmd)
- def loopDataStream(self):
- while self.exit_ == False:
- time.sleep(0.001)
- if self.clouds_ is not None:
- try:
- for clouds in self.clouds_:
- if self.cloudCallBack is not None:
- self.cloudCallBack(clouds)
- except Exception as e:
- if self.ErrorCallback is not None:
- self.ErrorCallback(str(e))
- self.clouds_=None
- def run(self):
- self.threadpool.submit(self.loopDataStream)
- print(" close ")
- def close(self):
- self.exit_=True
- self.threadpool.shutdown()
- self.join()
|