import time import grpc import def_pb2 as pb import def_pb2_grpc as mrpc import threading from concurrent.futures import ThreadPoolExecutor def singleton(cls): _instance = {} def inner(): if cls not in _instance: _instance[cls] = cls() return _instance[cls] return inner @singleton class GrpcStream(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.threadpool=ThreadPoolExecutor(5) self.exit_=False self.features_=None self.clouds_=None self.cloudCallBack=None self.ErrorCallback=None def SetDataCallBack(self,CloudsCallBack,ErrorCallback): self.cloudCallBack=CloudsCallBack self.ErrorCallback=ErrorCallback def OpenDataStream(self,ip,port): connectstr='%s:%d'%(ip,port) channel=grpc.insecure_channel(connectstr) stub = mrpc.StreamServerStub(channel) cmd=pb.RequestCmd() self.clouds_ = stub.OpenStream(cmd) def CloseDataStream(self,ip,port): connectstr='%s:%d'%(ip,port) channel=grpc.insecure_channel(connectstr) stub = mrpc.StreamServerStub(channel) cmd=pb.RequestCmd() print(" close stream") stub.CloseStream(cmd) print(" close stream over") def loopDataStream(self): while self.exit_ == False: time.sleep(0.001) if self.clouds_ is not None: try: for clouds in self.clouds_: if self.cloudCallBack is not None: self.cloudCallBack(clouds) except Exception as e: if self.ErrorCallback is not None: self.ErrorCallback(str(e)) self.clouds_=None def run(self): self.loopDataStream() print(" close ") def close(self): self.exit_=True self.threadpool.shutdown() self.join()