import copy import enum import time import message_pb2 as message import grpc import message_pb2_grpc as msg_rpc from ManualOperationWidget import ManualOperationType from custom_define import RobotName, ActType from mqtt_async import MqttAsync import google.protobuf.json_format as jtf import json from dijkstra.Map import DijikstraMap, SpaceNode, StreetNode import uuid import random import math import Count from sigmoid import sigmoid class TimeStatu: def __init__(self, statu=None, timeout=3): self.statu = statu self.time = time.time() self.timeout_ms = timeout def timeout(self): tm = time.time() return tm - self.time > self.timeout_ms or self.statu == None class Robot(MqttAsync): def __init__(self, rpc_ipport, name=""): MqttAsync.__init__(self) self.test_speed = 0 self.timedRobotStatu_ = TimeStatu(message.RobotStatu, 0) self.timedNavStatu_ = TimeStatu(message.NavStatu, 0) self.dataTopic_ = {} self.rpc_ipport_ = rpc_ipport self.messageArrivedSignal = None self.currentNavData_ = None self.navCmdTopic_ = None if(name == RobotName.strAGVMain): self.speedCmdTopic = "monitor_child/speedcmd" else: self.speedCmdTopic = "monitor_child/speedcmd" self.currentNavPathNodes_ = None self.currentNavPath_ = None self.pathColor_ = [1, 1, 0] self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色 self.name_ = name self.begId_ = "------" self.targetId_ = "------" self.autoTest_ = False self.l_ = 0.8 # 轮长 self.L_ = 1.3 # 轴距 self.W_ = 2.5 # 宽 self.heat_ = 0 # 心跳 def Color(self): if self.IsMainModeStatu(): return [0, 0, 0] return self.robotColor_ def IsMainAgv(self): if self.timedNavStatu_.timeout() == False: if self.timedNavStatu_.statu.main_agv == True: return True return False def IsMainModeStatu(self): if self.IsMainAgv(): if self.timedNavStatu_.timeout() == False: if self.timedNavStatu_.statu.move_mode == 1: return True return False def SetSubDataTopic(self, match: dict, messageSignal=None): self.dataTopic_ = match self.messageArrivedSignal = messageSignal for item in match.items(): [topic, _] = item self.subscribe(topic, self.on_message) def SetCmdTopic(self, topic): self.navCmdTopic_ = topic def SetColor(self, pathColor, robotColor): self.pathColor_ = pathColor self.robotColor_ = robotColor def ResetPose(self, agv: message.RobotStatu): self.timedRobotStatu_ = TimeStatu(agv, 2) def ResetNavStatu(self, statu: message.NavStatu): self.timedNavStatu_ = TimeStatu(statu, 2) def on_message(self, client, userdata, msg): topic = msg.topic if self.dataTopic_.get(topic) is not None: dtype = self.dataTopic_[topic] if dtype == message.RobotStatu.__name__: proto = message.RobotStatu() jtf.Parse(msg.payload.decode(), proto) self.ResetPose(proto) if dtype == message.NavStatu.__name__: self.last_running = self.IsNavigating() proto = message.NavStatu() jtf.Parse(msg.payload.decode(), proto) self.ResetNavStatu(proto) if self.last_running == True and self.IsNavigating() == False: self.currentNavPathNodes_ = None self.currentNavPath_ = None if self.messageArrivedSignal is not None: self.messageArrivedSignal() def MpcSelectTraj(self): traj = [] if self.timedNavStatu_.timeout() == False: nav = self.timedNavStatu_.statu for pose2d in nav.selected_traj.poses: traj.append([pose2d.x, pose2d.y, pose2d.theta]) return traj def MpcPredictTraj(self): traj = [] if self.timedNavStatu_.timeout() == False: nav = self.timedNavStatu_.statu for pose2d in nav.predict_traj.poses: traj.append([pose2d.x, pose2d.y, pose2d.theta]) return traj def Connected(self): return self.timedNavStatu_.timeout() == False and self.timedRobotStatu_.timeout() == False def IsNavigating(self): return not self.ActionType() == ActType.eReady '''if self.timedNavStatu_.timeout() == False: key = self.timedNavStatu_.statu.key if key == "" or key == None: return False return True''' def GeneratePath(self, begID, endID, task_type="None"): self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID) if task_type == "DoubleAGV": print("AdjustPath------------------") if self.IsMainModeStatu() is False: print("原始路径:") for node in self.currentNavPathNodes_: print(node.id_, node.y_) self.currentNavPathNodes_ = self.AdjustPath(self.currentNavPathNodes_) print("修改路径:") for node in self.currentNavPathNodes_: print(node.id_, node.y_) self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5) if self.currentNavPathNodes_ is not None and self.currentNavPath_ is not None: self.begId_ = begID self.targetId_ = endID return True return False def PositionId(self): if self.timedRobotStatu_.timeout(): return None x = self.timedRobotStatu_.statu.x y = self.timedRobotStatu_.statu.y djks_map = DijikstraMap() [label, pt] = djks_map.findNeastNode([x, y]) return [label, pt] ''' 阻塞直到导航完成 ''' def Navigatting(self, begId, targetId, autoDirect, wheelbase=0, task_type="None"): print("nav") self.GeneratePath(begId, targetId, task_type=task_type) # self.ExecPathNodes(autoDirect) self.ExecNavtask(autoDirect, wheelbase) if self.IsMainModeStatu(): Count.TestCount().loadCountAdd() else: Count.TestCount().singleCountAdd() return True def AutoTestNavClamp(self, begId, targetId): beg = begId target = targetId while self.autoTest_: if self.TestNavClampOnce(beg, target) == False: print(" quit auto Test") break print("1111") posInfo = self.PositionId() if posInfo is not None: [label, pt] = posInfo beg = label print("beg", beg) node = DijikstraMap().GetVertex(beg) if node is not None: if isinstance(node, (SpaceNode)): target = "O" if isinstance(node, (StreetNode)): end_nodes = ["E", "B", "C", "D"] id = random.randint(0, 1000) % 4 target = end_nodes[id] print("3333", target) print(" Next nav clamp cmd,%s to %s" % (beg, target)) def ActionType(self): if self.timedNavStatu_.timeout() == False: runningStatu = self.timedNavStatu_.statu if runningStatu.statu == 0: return ActType.eReady if runningStatu.statu == 1: return ActType.eRotation if runningStatu.statu == 2: return ActType.eHorizon if runningStatu.statu == 3: return ActType.eVertical if runningStatu.statu == 4: return ActType.eMPC if runningStatu.statu == 5: return ActType.eClampOpen if runningStatu.statu == 6: return ActType.eClampClose else: return ActType.eReady @staticmethod def generateProtoNode(node): protoNode = message.PathNode() protoNode.x = node.x_ protoNode.y = node.y_ protoNode.id = node.id_ return protoNode def ExecNavtask(self, autoDirect, wheelbase=0): cmd = message.NavCmd() key = str(uuid.uuid4()) cmd.key = key cmd.action = 0 actions = self.SplitPath(self.currentNavPathNodes_) for action in actions: [type, nodes] = action if type == "input": [street, space] = nodes protoSpace = self.generateProtoNode(space) protoStreet = self.generateProtoNode(street) act = message.NewAction() act.type = 1 act.wheelbase = wheelbase act.spaceNode.CopyFrom(protoSpace) act.streetNode.CopyFrom(protoStreet) cmd.newActions.add().CopyFrom(act) if type == "output": [space, street] = nodes protoSpace = self.generateProtoNode(space) protoStreet = self.generateProtoNode(street) act = message.NewAction() act.type = 2 act.wheelbase = wheelbase act.spaceNode.CopyFrom(protoSpace) act.streetNode.CopyFrom(protoStreet) cmd.newActions.add().CopyFrom(act) if type == "nav": action = self.CreateNavPathNodesAction(nodes, autoDirect) if action is None: print("Nav cmd is None") return False else: cmd.newActions.add().CopyFrom(action) print(cmd) if cmd is None: print("Nav cmd is None") return False channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) print("client received: ", response) if not response.ret == 0: print("nav failed :%s" % (response.info)) else: print("nav completed !!!") return True def CreateNavPathNodesAction(self, path, autoDirect): # if path is not None and self.timedRobotStatu_.timeout() == False: if 1: for node in path: if isinstance(node, (SpaceNode)): print(" nodes must all street node") return None statu = self.timedRobotStatu_.statu # if statu is not None: if 1: newAction = message.NewAction() size = len(path) for i in range(size): node = path[i] pathNode = message.PathNode() pathNode.x = node.x_ pathNode.y = node.y_ newAction.type = 4 if autoDirect: newAction.type = 3 newAction.pathNodes.add().CopyFrom(pathNode) return newAction else: print("statu is None") else: print("current path is none") return None def ExecPathNodes(self, autoDirect): if self.navCmdTopic_ == None: print("Robot has not set nav cmd topic") return False if not self.ActionType() == ActType.eReady: print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key)) return False cmd = message.NavCmd() key = str(uuid.uuid4()) cmd.key = key cmd.action = 0 action = self.CreateNavPathNodesAction(self.currentNavPathNodes_, autoDirect) if action is None: print("Nav cmd is None") return False cmd.newActions.add().CopyFrom(action) print(cmd) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True ''' 根据AGV单车的起始位置调整路径 ''' def AdjustPath(self, path): adjusted_path = copy.deepcopy(path) y_offset = 1.2 dy = 0 # if self.IsMainAgv() or self.name_ == "AgvMain": if self.name_ == RobotName.strAGVMain: dy = 1 # if not self.IsMainAgv() or self == "AGV2": if self.name_ == RobotName.strAGV2: dy = -1 print("dy:", dy) for node in adjusted_path: if isinstance(node, StreetNode): if dy > 0: node.y_ += y_offset elif dy < 0: node.y_ -= y_offset else: pass return adjusted_path ''' 拆分路径,生成动作集,input:入库动作,nav:导航动作,output:出库动作 ''' def SplitPath(self, path): space_count = 0 for i in range(len(path)): if isinstance(path[i], (SpaceNode)): space_count += 1 if i == 0 or i == len(path) - 1: pass else: print("Error: space node is not first/last of path") return None actions = [] lastNode = None navIndex = -1 for node in path: if lastNode == None: lastNode = node continue if isinstance(node, (SpaceNode)): # 当前点是车位点 if isinstance(lastNode, (SpaceNode)): # 上一点为车位点 print("Error: last node and current node is space node ") lastNode = None continue elif isinstance(lastNode, (StreetNode)): # 上一点为马路点,则动作为入库 actions.append(["input", [lastNode, node]]) lastNode = None continue if isinstance(node, (StreetNode)): if isinstance(lastNode, (SpaceNode)): # 上一点为车位点, 出库 actions.append(["output", [lastNode, node]]) elif isinstance(lastNode, (StreetNode)): if navIndex < 0: actions.append(["nav", [lastNode]]) navIndex = len(actions) - 1 actions[navIndex][1].append(node) lastNode = node return actions def SwitchMoveMode(self, mode, wheelbase): if self.IsMainAgv() == False: print(" this agv is single can not switch mode") return False cmd = message.NavCmd() key = str(uuid.uuid4()) cmd.key = key cmd.action = 0 action = message.NewAction() action.type = 8 action.changedMode = mode if mode == 1: action.wheelbase = wheelbase cmd.newActions.add().CopyFrom(action) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return False def IsClampClosed(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): clamp = self.timedRobotStatu_.statu.agvStatu.clamp clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other return (clamp == 1 and clamp_other == 1) else: return self.timedRobotStatu_.statu.agvStatu.clamp == 1 return False def IsClampRunning(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): clamp = self.timedRobotStatu_.statu.agvStatu.clamp clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other return (clamp == 0 or clamp_other == 0) else: return self.timedRobotStatu_.statu.agvStatu.clamp == 0 return False def IsClampOpened(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): clamp = self.timedRobotStatu_.statu.agvStatu.clamp clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other return (clamp == 2 and clamp_other == 2) else: return self.timedRobotStatu_.statu.agvStatu.clamp == 2 return False def ClampClose(self): if self.IsClampClosed() == True: print("clamp closed") return True cmd = message.NavCmd() cmd.action = 0 key = str(uuid.uuid4()) cmd.key = (key) act = message.NewAction() act.type = 6 cmd.newActions.add().CopyFrom(act) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def ClampOpen(self): if self.IsClampOpened() == True: print("clamp opended") return True cmd = message.NavCmd() cmd.action = 0 key = str(uuid.uuid4()) cmd.key = (key) act = message.NewAction() act.type = 7 cmd.newActions.add().CopyFrom(act) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def CancelNavTask(self): cmd = message.NavCmd() cmd.action = 3 channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Cancel(cmd) print("client received: ", response) print(" Cancel task completed!!!") return True def ManualTask(self, current_operation, step_acc): if self.IsNavigating(): print("AGV is navigating!!!") return cmd = message.ToAgvCmd() self.heat_ += 1 self.heat_ %= 255 cmd.H = self.heat_ if self.IsMainModeStatu(): cmd.M = 1 else: cmd.M = 0 if current_operation == ManualOperationType.eCounterclockwiseRotate: cmd.T = 1 cmd.V = 0 cmd.W = 4.0 / 180 * math.pi * sigmoid(step_acc, 0, 1) if current_operation == ManualOperationType.eClockwiseRotate: cmd.T = 1 cmd.V = 0 cmd.W = -4.0 / 180 * math.pi * sigmoid(step_acc, 0, 1) if current_operation == ManualOperationType.eForwardMove: cmd.T = 3 cmd.V = 0.6 * sigmoid(step_acc, 0, 1) cmd.W = 0 if current_operation == ManualOperationType.eBackwardMove: cmd.T = 3 cmd.V = -0.6 * sigmoid(step_acc, 0, 1) cmd.W = 0 if current_operation == ManualOperationType.eLeftMove: cmd.T = 2 cmd.V = 0.6 * sigmoid(step_acc, 0, 1) cmd.W = 0 if current_operation == ManualOperationType.eRightMove: cmd.T = 2 cmd.V = -0.6 * sigmoid(step_acc, 0, 1) cmd.W = 0 cmd.L = self.L_ cmd.end = 1 # 处理为json格式 cmd_dict = {} for field in cmd.DESCRIPTOR.fields: cmd_dict[field.name] = field.default_value for field, value in cmd.ListFields(): cmd_dict[field.name] = value cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':')) self.publish(self.speedCmdTopic, cmd_json) print(cmd_json) # cmd_0 = message.ManualCmd() # cmd_0.key = str(uuid.uuid4()) # if current_operation == ManualOperationType.eCounterclockwiseRotate: # cmd_0.operation_type = 1 # cmd_0.velocity = 1 # if current_operation == ManualOperationType.eClockwiseRotate: # cmd_0.operation_type = 1 # cmd_0.velocity = -1 # if current_operation == ManualOperationType.eForwardMove: # cmd_0.operation_type = 2 # cmd_0.velocity = 1 # if current_operation == ManualOperationType.eBackwardMove: # cmd_0.operation_type = 2 # cmd_0.velocity = -1 # if current_operation == ManualOperationType.eLeftMove: # cmd_0.operation_type = 3 # cmd_0.velocity = 1 # if current_operation == ManualOperationType.eRightMove: # cmd_0.operation_type = 3 # cmd_0.velocity = -1 # # channel = grpc.insecure_channel(self.rpc_ipport_) # stub = msg_rpc.NavExcutorStub(channel) # response = stub.ManualOperation(cmd_0) # print("client received: ", response) print("ManualOperation {op_type} task completed!!!".format(op_type=current_operation)) return True