GrpcClient.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import time
  2. import grpc
  3. import def_pb2 as pb
  4. import def_pb2_grpc as mrpc
  5. import threading
  6. from concurrent.futures import ThreadPoolExecutor
  7. def singleton(cls):
  8. _instance = {}
  9. def inner():
  10. if cls not in _instance:
  11. _instance[cls] = cls()
  12. return _instance[cls]
  13. return inner
  14. @singleton
  15. class GrpcStream(threading.Thread):
  16. def __init__(self):
  17. threading.Thread.__init__(self)
  18. self.threadpool=ThreadPoolExecutor(5)
  19. self.exit_=False
  20. self.features_=None
  21. self.clouds_=None
  22. self.cloudCallBack=None
  23. self.ErrorCallback=None
  24. def SetDataCallBack(self,CloudsCallBack,ErrorCallback):
  25. self.cloudCallBack=CloudsCallBack
  26. self.ErrorCallback=ErrorCallback
  27. def OpenDataStream(self,ip,port):
  28. connectstr='%s:%d'%(ip,port)
  29. channel=grpc.insecure_channel(connectstr)
  30. stub = mrpc.StreamServerStub(channel)
  31. cmd=pb.RequestCmd()
  32. self.clouds_ = stub.OpenStream(cmd)
  33. def CloseDataStream(self,ip,port):
  34. connectstr='%s:%d'%(ip,port)
  35. channel=grpc.insecure_channel(connectstr)
  36. stub = mrpc.StreamServerStub(channel)
  37. cmd=pb.RequestCmd()
  38. print(" close stream")
  39. stub.CloseStream(cmd)
  40. print(" close stream over")
  41. def loopDataStream(self):
  42. while self.exit_ == False:
  43. time.sleep(0.001)
  44. if self.clouds_ is not None:
  45. try:
  46. for clouds in self.clouds_:
  47. if self.cloudCallBack is not None:
  48. self.cloudCallBack(clouds)
  49. except Exception as e:
  50. if self.ErrorCallback is not None:
  51. self.ErrorCallback(str(e))
  52. self.clouds_=None
  53. def run(self):
  54. self.loopDataStream()
  55. print(" close ")
  56. def close(self):
  57. self.exit_=True
  58. self.threadpool.shutdown()
  59. self.join()