123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607 |
- import copy
- import enum
- import time
- import message_pb2 as message
- import grpc
- import message_pb2_grpc as msg_rpc
- from ManualOperationWidget import ManualOperationType
- from custom_define import RobotName, ActType
- from mqtt_async import MqttAsync
- import google.protobuf.json_format as jtf
- import json
- from dijkstra.Map import DijikstraMap, SpaceNode, StreetNode
- import uuid
- import random
- import math
- import Count
- from sigmoid import sigmoid
- class TimeStatu:
- def __init__(self, statu=None, timeout=3):
- self.statu = statu
- self.time = time.time()
- self.timeout_ms = timeout
- def timeout(self):
- tm = time.time()
- return tm - self.time > self.timeout_ms or self.statu == None
- class Robot(MqttAsync):
- def __init__(self, rpc_ipport, name=""):
- MqttAsync.__init__(self)
- self.test_speed = 0
- self.timedRobotStatu_ = TimeStatu(message.RobotStatu, 0)
- self.timedNavStatu_ = TimeStatu(message.NavStatu, 0)
- self.dataTopic_ = {}
- self.rpc_ipport_ = rpc_ipport
- self.messageArrivedSignal = None
- self.currentNavData_ = None
- self.navCmdTopic_ = None
- if(name == RobotName.strAGVMain):
- self.speedCmdTopic = "monitor_child/speedcmd"
- else:
- self.speedCmdTopic = "monitor_child/speedcmd"
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- self.pathColor_ = [1, 1, 0]
- self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色
- self.name_ = name
- self.begId_ = "------"
- self.targetId_ = "------"
- self.autoTest_ = False
- self.l_ = 0.8 # 轮长
- self.L_ = 1.3 # 轴距
- self.W_ = 2.5 # 宽
- self.heat_ = 0 # 心跳
- def Color(self):
- if self.IsMainModeStatu():
- return [0, 0, 0]
- return self.robotColor_
- def IsMainAgv(self):
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.main_agv == True:
- return True
- return False
- def IsMainModeStatu(self):
- if self.IsMainAgv():
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.move_mode == 1:
- return True
- return False
- def SetSubDataTopic(self, match: dict, messageSignal=None):
- self.dataTopic_ = match
- self.messageArrivedSignal = messageSignal
- for item in match.items():
- [topic, _] = item
- self.subscribe(topic, self.on_message)
- def SetCmdTopic(self, topic):
- self.navCmdTopic_ = topic
- def SetColor(self, pathColor, robotColor):
- self.pathColor_ = pathColor
- self.robotColor_ = robotColor
- def ResetPose(self, agv: message.RobotStatu):
- self.timedRobotStatu_ = TimeStatu(agv, 2)
- def ResetNavStatu(self, statu: message.NavStatu):
- self.timedNavStatu_ = TimeStatu(statu, 2)
- def on_message(self, client, userdata, msg):
- topic = msg.topic
- if self.dataTopic_.get(topic) is not None:
- dtype = self.dataTopic_[topic]
- if dtype == message.RobotStatu.__name__:
- proto = message.RobotStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetPose(proto)
- if dtype == message.NavStatu.__name__:
- self.last_running = self.IsNavigating()
- proto = message.NavStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetNavStatu(proto)
- if self.last_running == True and self.IsNavigating() == False:
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- if self.messageArrivedSignal is not None:
- self.messageArrivedSignal()
- def MpcSelectTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.selected_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def MpcPredictTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.predict_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def Connected(self):
- return self.timedNavStatu_.timeout() == False and self.timedRobotStatu_.timeout() == False
- def IsNavigating(self):
- return not self.ActionType() == ActType.eReady
- '''if self.timedNavStatu_.timeout() == False:
- key = self.timedNavStatu_.statu.key
- if key == "" or key == None:
- return False
- return True'''
- def GeneratePath(self, begID, endID, task_type="None"):
- self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID)
- if task_type == "DoubleAGV":
- print("AdjustPath------------------")
- if self.IsMainModeStatu() is False:
- print("原始路径:")
- for node in self.currentNavPathNodes_:
- print(node.id_, node.y_)
- self.currentNavPathNodes_ = self.AdjustPath(self.currentNavPathNodes_)
- print("修改路径:")
- for node in self.currentNavPathNodes_:
- print(node.id_, node.y_)
- self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
- if self.currentNavPathNodes_ is not None and self.currentNavPath_ is not None:
- self.begId_ = begID
- self.targetId_ = endID
- return True
- return False
- def PositionId(self):
- if self.timedRobotStatu_.timeout():
- return None
- x = self.timedRobotStatu_.statu.x
- y = self.timedRobotStatu_.statu.y
- djks_map = DijikstraMap()
- [label, pt] = djks_map.findNeastNode([x, y])
- return [label, pt]
- '''
- 阻塞直到导航完成
- '''
- def Navigatting(self, begId, targetId, autoDirect, wheelbase=0, task_type="None"):
- print("nav")
- self.GeneratePath(begId, targetId, task_type=task_type)
- # self.ExecPathNodes(autoDirect)
- self.ExecNavtask(autoDirect, wheelbase)
- if self.IsMainModeStatu():
- Count.TestCount().loadCountAdd()
- else:
- Count.TestCount().singleCountAdd()
- return True
- def AutoTestNavClamp(self, begId, targetId):
- beg = begId
- target = targetId
- while self.autoTest_:
- if self.TestNavClampOnce(beg, target) == False:
- print(" quit auto Test")
- break
- print("1111")
- posInfo = self.PositionId()
- if posInfo is not None:
- [label, pt] = posInfo
- beg = label
- print("beg", beg)
- node = DijikstraMap().GetVertex(beg)
- if node is not None:
- if isinstance(node, (SpaceNode)):
- target = "O"
- if isinstance(node, (StreetNode)):
- end_nodes = ["E", "B", "C", "D"]
- id = random.randint(0, 1000) % 4
- target = end_nodes[id]
- print("3333", target)
- print(" Next nav clamp cmd,%s to %s" % (beg, target))
- def ActionType(self):
- if self.timedNavStatu_.timeout() == False:
- runningStatu = self.timedNavStatu_.statu
- if runningStatu.statu == 0:
- return ActType.eReady
- if runningStatu.statu == 1:
- return ActType.eRotation
- if runningStatu.statu == 2:
- return ActType.eHorizon
- if runningStatu.statu == 3:
- return ActType.eVertical
- if runningStatu.statu == 4:
- return ActType.eMPC
- if runningStatu.statu == 5:
- return ActType.eClampOpen
- if runningStatu.statu == 6:
- return ActType.eClampClose
- else:
- return ActType.eReady
- @staticmethod
- def generateProtoNode(node):
- protoNode = message.PathNode()
- protoNode.x = node.x_
- protoNode.y = node.y_
- protoNode.id = node.id_
- return protoNode
- def ExecNavtask(self, autoDirect, wheelbase=0):
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = key
- cmd.action = 0
- actions = self.SplitPath(self.currentNavPathNodes_)
- for action in actions:
- [type, nodes] = action
- if type == "input":
- [street, space] = nodes
- protoSpace = self.generateProtoNode(space)
- protoStreet = self.generateProtoNode(street)
- act = message.NewAction()
- act.type = 1
- act.wheelbase = wheelbase
- act.spaceNode.CopyFrom(protoSpace)
- act.streetNode.CopyFrom(protoStreet)
- cmd.newActions.add().CopyFrom(act)
- if type == "output":
- [space, street] = nodes
- protoSpace = self.generateProtoNode(space)
- protoStreet = self.generateProtoNode(street)
- act = message.NewAction()
- act.type = 2
- act.wheelbase = wheelbase
- act.spaceNode.CopyFrom(protoSpace)
- act.streetNode.CopyFrom(protoStreet)
- cmd.newActions.add().CopyFrom(act)
- if type == "nav":
- action = self.CreateNavPathNodesAction(nodes, autoDirect)
- if action is None:
- print("Nav cmd is None")
- return False
- else:
- cmd.newActions.add().CopyFrom(action)
- print(cmd)
- if cmd is None:
- print("Nav cmd is None")
- return False
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- print("client received: ", response)
- if not response.ret == 0:
- print("nav failed :%s" % (response.info))
- else:
- print("nav completed !!!")
- return True
- def CreateNavPathNodesAction(self, path, autoDirect):
- # if path is not None and self.timedRobotStatu_.timeout() == False:
- if 1:
- for node in path:
- if isinstance(node, (SpaceNode)):
- print(" nodes must all street node")
- return None
- statu = self.timedRobotStatu_.statu
- # if statu is not None:
- if 1:
- newAction = message.NewAction()
- size = len(path)
- for i in range(size):
- node = path[i]
- pathNode = message.PathNode()
- pathNode.x = node.x_
- pathNode.y = node.y_
- newAction.type = 4
- if autoDirect:
- newAction.type = 3
- newAction.pathNodes.add().CopyFrom(pathNode)
- return newAction
- else:
- print("statu is None")
- else:
- print("current path is none")
- return None
- def ExecPathNodes(self, autoDirect):
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return False
- if not self.ActionType() == ActType.eReady:
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return False
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = key
- cmd.action = 0
- action = self.CreateNavPathNodesAction(self.currentNavPathNodes_, autoDirect)
- if action is None:
- print("Nav cmd is None")
- return False
- cmd.newActions.add().CopyFrom(action)
- print(cmd)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- '''
- 根据AGV单车的起始位置调整路径
- '''
- def AdjustPath(self, path):
- adjusted_path = copy.deepcopy(path)
- y_offset = 1.2
- dy = 0
- # if self.IsMainAgv() or self.name_ == "AgvMain":
- if self.name_ == RobotName.strAGVMain:
- dy = 1
- # if not self.IsMainAgv() or self == "AGV2":
- if self.name_ == RobotName.strAGV2:
- dy = -1
- print("dy:", dy)
- for node in adjusted_path:
- if isinstance(node, StreetNode):
- if dy > 0:
- node.y_ += y_offset
- elif dy < 0:
- node.y_ -= y_offset
- else:
- pass
- return adjusted_path
- '''
- 拆分路径,生成动作集,input:入库动作,nav:导航动作,output:出库动作
- '''
- def SplitPath(self, path):
- space_count = 0
- for i in range(len(path)):
- if isinstance(path[i], (SpaceNode)):
- space_count += 1
- if i == 0 or i == len(path) - 1:
- pass
- else:
- print("Error: space node is not first/last of path")
- return None
- actions = []
- lastNode = None
- navIndex = -1
- for node in path:
- if lastNode == None:
- lastNode = node
- continue
- if isinstance(node, (SpaceNode)): # 当前点是车位点
- if isinstance(lastNode, (SpaceNode)): # 上一点为车位点
- print("Error: last node and current node is space node ")
- lastNode = None
- continue
- elif isinstance(lastNode, (StreetNode)): # 上一点为马路点,则动作为入库
- actions.append(["input", [lastNode, node]])
- lastNode = None
- continue
- if isinstance(node, (StreetNode)):
- if isinstance(lastNode, (SpaceNode)): # 上一点为车位点, 出库
- actions.append(["output", [lastNode, node]])
- elif isinstance(lastNode, (StreetNode)):
- if navIndex < 0:
- actions.append(["nav", [lastNode]])
- navIndex = len(actions) - 1
- actions[navIndex][1].append(node)
- lastNode = node
- return actions
- def SwitchMoveMode(self, mode, wheelbase):
- if self.IsMainAgv() == False:
- print(" this agv is single can not switch mode")
- return False
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = key
- cmd.action = 0
- action = message.NewAction()
- action.type = 8
- action.changedMode = mode
- if mode == 1:
- action.wheelbase = wheelbase
- cmd.newActions.add().CopyFrom(action)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return False
- def IsClampClosed(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 1 and clamp_other == 1)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 1
- return False
- def IsClampRunning(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 0 or clamp_other == 0)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 0
- return False
- def IsClampOpened(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 2 and clamp_other == 2)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 2
- return False
- def ClampClose(self):
- if self.IsClampClosed() == True:
- print("clamp closed")
- return True
- cmd = message.NavCmd()
- cmd.action = 0
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.NewAction()
- act.type = 6
- cmd.newActions.add().CopyFrom(act)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def ClampOpen(self):
- if self.IsClampOpened() == True:
- print("clamp opended")
- return True
- cmd = message.NavCmd()
- cmd.action = 0
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.NewAction()
- act.type = 7
- cmd.newActions.add().CopyFrom(act)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def CancelNavTask(self):
- cmd = message.NavCmd()
- cmd.action = 3
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Cancel(cmd)
- print("client received: ", response)
- print(" Cancel task completed!!!")
- return True
- def ManualTask(self, current_operation, step_acc):
- if self.IsNavigating():
- print("AGV is navigating!!!")
- return
- cmd = message.ToAgvCmd()
- self.heat_ += 1
- self.heat_ %= 255
- cmd.H = self.heat_
- if self.IsMainModeStatu():
- cmd.M = 1
- else:
- cmd.M = 0
- if current_operation == ManualOperationType.eCounterclockwiseRotate:
- cmd.T = 1
- cmd.V = 0
- cmd.W = 4.0 / 180 * math.pi * sigmoid(step_acc, 0, 1)
- if current_operation == ManualOperationType.eClockwiseRotate:
- cmd.T = 1
- cmd.V = 0
- cmd.W = -4.0 / 180 * math.pi * sigmoid(step_acc, 0, 1)
- if current_operation == ManualOperationType.eForwardMove:
- cmd.T = 3
- cmd.V = 0.6 * sigmoid(step_acc, 0, 1)
- cmd.W = 0
- if current_operation == ManualOperationType.eBackwardMove:
- cmd.T = 3
- cmd.V = -0.6 * sigmoid(step_acc, 0, 1)
- cmd.W = 0
- if current_operation == ManualOperationType.eLeftMove:
- cmd.T = 2
- cmd.V = 0.6 * sigmoid(step_acc, 0, 1)
- cmd.W = 0
- if current_operation == ManualOperationType.eRightMove:
- cmd.T = 2
- cmd.V = -0.6 * sigmoid(step_acc, 0, 1)
- cmd.W = 0
- cmd.L = self.L_
- cmd.end = 1
- # 处理为json格式
- cmd_dict = {}
- for field in cmd.DESCRIPTOR.fields:
- cmd_dict[field.name] = field.default_value
- for field, value in cmd.ListFields():
- cmd_dict[field.name] = value
- cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':'))
- self.publish(self.speedCmdTopic, cmd_json)
- print(cmd_json)
- # cmd_0 = message.ManualCmd()
- # cmd_0.key = str(uuid.uuid4())
- # if current_operation == ManualOperationType.eCounterclockwiseRotate:
- # cmd_0.operation_type = 1
- # cmd_0.velocity = 1
- # if current_operation == ManualOperationType.eClockwiseRotate:
- # cmd_0.operation_type = 1
- # cmd_0.velocity = -1
- # if current_operation == ManualOperationType.eForwardMove:
- # cmd_0.operation_type = 2
- # cmd_0.velocity = 1
- # if current_operation == ManualOperationType.eBackwardMove:
- # cmd_0.operation_type = 2
- # cmd_0.velocity = -1
- # if current_operation == ManualOperationType.eLeftMove:
- # cmd_0.operation_type = 3
- # cmd_0.velocity = 1
- # if current_operation == ManualOperationType.eRightMove:
- # cmd_0.operation_type = 3
- # cmd_0.velocity = -1
- #
- # channel = grpc.insecure_channel(self.rpc_ipport_)
- # stub = msg_rpc.NavExcutorStub(channel)
- # response = stub.ManualOperation(cmd_0)
- # print("client received: ", response)
- print("ManualOperation {op_type} task completed!!!".format(op_type=current_operation))
- return True
|