import copy import enum import time from numpy import fabs import message_pb2 as message import grpc import message_pb2_grpc as msg_rpc from ManualOperationWidget import ManualOperationType from custom_define import RobotName, ActType from mqtt_async import MqttAsync import google.protobuf.json_format as jtf import json from dijkstra.Map import SpaceNode, StreetNode, MapManager, DijikstraMap import uuid import random import math import Count from sigmoid import sigmoid class TimeStatu: def __init__(self, statu=None, timeout=3): self.statu = statu self.time = time.time() self.timeout_s = timeout def timeout(self): tm = time.time() return tm - self.time > self.timeout_s or self.statu == None class Robot(MqttAsync): def __init__(self, rpc_ipport, name=""): MqttAsync.__init__(self) self.test_speed = 0 self.timedRobotStatu_ = TimeStatu(message.RobotStatu, 0) self.timedNavStatu_ = TimeStatu(message.NavStatu, 0) self.dataTopic_ = {} self.rpc_ipport_ = rpc_ipport self.messageArrivedSignal = None self.currentNavData_ = None self.navCmdTopic_ = None if (name == RobotName.strAGVMain): self.speedCmdTopic = "monitor_child/speedcmd" else: self.speedCmdTopic = "monitor_child/speedcmd" self.currentNavPathNodes_ = None self.currentNavPath_ = None # 当前正在执行的流程默认0,存车1, 取车2 self.current_sequence_: int = 0 self.pathColor_ = [1, 1, 0] self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色 self.name_ = name self.begId_ = "------" self.targetId_ = "------" self.autoTest_ = False self.l_ = 0.8 # 轮长 self.L_ = 2.9 # 轴距 self.W_ = 2.5 # 宽 self.heat_ = 0 # 心跳 self.manual_status = ManualOperationType.eReady # 正在进行的手动操作 self.manual_speed = [0.0, 0] def Color(self): if self.IsMainModeStatu(): return [0, 0, 0] return self.robotColor_ def IsMainAgv(self): if self.timedNavStatu_.timeout() == False: if self.timedNavStatu_.statu.main_agv == True: return True return False def IsMainModeStatu(self): if self.IsMainAgv(): if self.timedNavStatu_.timeout() == False: if self.timedNavStatu_.statu.move_mode == 1: return True return False def SetSubDataTopic(self, match: dict, messageSignal=None): self.dataTopic_ = match self.messageArrivedSignal = messageSignal for item in match.items(): [topic, _] = item self.subscribe(topic, self.on_message) def SetCmdTopic(self, topic): self.navCmdTopic_ = topic def SetColor(self, pathColor, robotColor): self.pathColor_ = pathColor self.robotColor_ = robotColor def ResetPose(self, agv: message.RobotStatu): self.timedRobotStatu_ = TimeStatu(agv, 2) def ResetNavStatu(self, statu: message.NavStatu): self.timedNavStatu_ = TimeStatu(statu, 2) def on_message(self, client, userdata, msg): topic = msg.topic if self.dataTopic_.get(topic) is not None: dtype = self.dataTopic_[topic] if dtype == message.RobotStatu.__name__: proto = message.RobotStatu() jtf.Parse(msg.payload.decode(), proto) self.ResetPose(proto) if dtype == message.NavStatu.__name__: self.last_running = self.IsNavigating() proto = message.NavStatu() jtf.Parse(msg.payload.decode(), proto) self.ResetNavStatu(proto) if self.last_running == True and self.IsNavigating() == False: self.currentNavPathNodes_ = None self.currentNavPath_ = None if self.messageArrivedSignal is not None: self.messageArrivedSignal() def MpcSelectTraj(self): traj = [] if self.timedNavStatu_.timeout() == False: nav = self.timedNavStatu_.statu for pose2d in nav.selected_traj.poses: traj.append([pose2d.x, pose2d.y, pose2d.theta]) return traj def MpcPredictTraj(self): traj = [] if self.timedNavStatu_.timeout() == False: nav = self.timedNavStatu_.statu for pose2d in nav.predict_traj.poses: traj.append([pose2d.x, pose2d.y, pose2d.theta]) return traj def Connected(self): return self.timedNavStatu_.timeout() == False and self.timedRobotStatu_.timeout() == False def IsNavigating(self): return not self.ActionType() == ActType.eReady '''if self.timedNavStatu_.timeout() == False: key = self.timedNavStatu_.statu.key if key == "" or key == None: return False return True''' def GeneratePath(self, begID, endID, mapName): self.currentNavPathNodes_ = MapManager().GetShortestPath(mapName, begID, endID) # if task_type == "DoubleAGV": # print("AdjustPath------------------") # if self.IsMainModeStatu() is False: # print("原始路径:") # for node in self.currentNavPathNodes_: # print(node.id_, node.y_) # self.currentNavPathNodes_ = self.AdjustPath(self.currentNavPathNodes_) # print("修改路径:") # for node in self.currentNavPathNodes_: # print(node.id_, node.y_) self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5) if self.currentNavPathNodes_ is not None and self.currentNavPath_ is not None: self.begId_ = begID self.targetId_ = endID return True return False def PositionId(self): if self.timedRobotStatu_.timeout(): return None x = self.timedRobotStatu_.statu.x y = self.timedRobotStatu_.statu.y djks_map = MapManager() [label, pt] = djks_map.findNeastNode([x, y]) return [label, pt] ''' 阻塞直到导航完成 ''' def Navigatting(self, begId, targetId, autoDirect, wheelbase=0, task_type: int = 0): if wheelbase < 2.4 or wheelbase > 4: print("wheelbase is out of range!\n") return False print("nav") self.SetSequence(task_type) # self.ExecPathNodes(autoDirect) if self.ExecNavtask(autoDirect, wheelbase) == False: self.ResetSequence() return False # self.ResetSequence() if self.IsMainModeStatu(): Count.TestCount().loadCountAdd() else: Count.TestCount().singleCountAdd() return True def AutoTestNavClamp(self, begId, targetId): beg = begId target = targetId while self.autoTest_: if self.TestNavClampOnce(beg, target) == False: print(" quit auto Test") break print("1111") posInfo = self.PositionId() if posInfo is not None: [label, pt] = posInfo beg = label print("beg", beg) node = MapManager().GetVertex(beg) if node is not None: if isinstance(node, (SpaceNode)): target = "O" if isinstance(node, (StreetNode)): end_nodes = ["E", "B", "C", "D"] id = random.randint(0, 1000) % 4 target = end_nodes[id] print("3333", target) print(" Next nav clamp cmd,%s to %s" % (beg, target)) def ActionType(self): if self.timedNavStatu_.timeout() == False: runningStatu = self.timedNavStatu_.statu if runningStatu.statu == 0: return ActType.eReady if runningStatu.statu == 1: return ActType.eRotation if runningStatu.statu == 2: return ActType.eHorizon if runningStatu.statu == 3: return ActType.eVertical if runningStatu.statu == 4: return ActType.eMPC if runningStatu.statu == 5: return ActType.eClampClose if runningStatu.statu == 6: return ActType.eClampOpen if runningStatu.statu == 7: return ActType.eLifterRise if runningStatu.statu == 8: return ActType.eLifterDown else: return ActType.eReady @staticmethod def generateProtoNode(node): protoNode = message.PathNode() protoNode.x = node.x_ protoNode.y = node.y_ protoNode.id = node.id_ return protoNode def ExecNavtask(self, autoDirect, wheelbase=0): cmd = message.NavCmd() key = str(uuid.uuid4()) cmd.key = key cmd.action = 0 cmd.sequence = self.current_sequence_ if len(self.currentNavPathNodes_)==1: self.currentNavPathNodes_.append(self.currentNavPathNodes_[0]) actions = self.SplitPath(self.currentNavPathNodes_) for action in actions: [type, nodes] = action if type == "input": [street, space] = nodes protoSpace = self.generateProtoNode(space) protoStreet = self.generateProtoNode(street) act = message.NewAction() act.type = 1 act.wheelbase = wheelbase act.spaceNode.CopyFrom(protoSpace) act.streetNode.CopyFrom(protoStreet) cmd.newActions.add().CopyFrom(act) if type == "output": [space, street] = nodes protoSpace = self.generateProtoNode(space) protoStreet = self.generateProtoNode(street) act = message.NewAction() act.type = 2 act.wheelbase = wheelbase act.spaceNode.CopyFrom(protoSpace) act.streetNode.CopyFrom(protoStreet) cmd.newActions.add().CopyFrom(act) if type == "nav": action = self.CreateNavPathNodesAction(nodes, autoDirect) if action is None: print("Nav cmd is None") return False else: cmd.newActions.add().CopyFrom(action) print(cmd) if cmd is None: print("Nav cmd is None") return False channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) print("client received: ", response) if not response.ret == 0: print("nav failed :%s" % (response.info)) return False else: print("nav completed !!!") return True def CreateNavPathNodesAction(self, path, autoDirect): # if path is not None and self.timedRobotStatu_.timeout() == False: if 1: for node in path: if isinstance(node, (SpaceNode)): print(" nodes must all street node") return None statu = self.timedRobotStatu_.statu # if statu is not None: if 1: newAction = message.NewAction() size = len(path) for i in range(size): node = path[i] pathNode = message.PathNode() pathNode.x = node.x_ pathNode.y = node.y_ pathNode.id = node.id_ newAction.type = 4 if autoDirect: newAction.type = 3 newAction.pathNodes.add().CopyFrom(pathNode) return newAction else: print("statu is None") else: print("current path is none") return None def ExecPathNodes(self, autoDirect): if self.navCmdTopic_ == None: print("Robot has not set nav cmd topic") return False if not self.ActionType() == ActType.eReady: print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key)) return False cmd = message.NavCmd() key = str(uuid.uuid4()) cmd.key = key cmd.action = 0 action = self.CreateNavPathNodesAction(self.currentNavPathNodes_, autoDirect) if action is None: print("Nav cmd is None") return False cmd.newActions.add().CopyFrom(action) print(cmd) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True ''' 根据AGV单车的起始位置调整路径 ''' def AdjustPath(self, path): adjusted_path = copy.deepcopy(path) y_offset = 1.2 dy = 0 # if self.IsMainAgv() or self.name_ == "AgvMain": if self.name_ == RobotName.strAGVMain: dy = 1 # if not self.IsMainAgv() or self == "AGV2": if self.name_ == RobotName.strAGV2: dy = -1 print("dy:", dy) for node in adjusted_path: if isinstance(node, StreetNode): if dy > 0: node.y_ += y_offset elif dy < 0: node.y_ -= y_offset else: pass return adjusted_path ''' 拆分路径,生成动作集,input:入库动作,nav:导航动作,output:出库动作 ''' def SplitPath(self, path): space_count = 0 for i in range(len(path)): if isinstance(path[i], (SpaceNode)): space_count += 1 if i == 0 or i == len(path) - 1: pass else: print("Error: space node is not first/last of path") return None actions = [] lastNode = None navIndex = -1 for node in path: if lastNode == None: lastNode = node continue if isinstance(node, (SpaceNode)): # 当前点是车位点 if isinstance(lastNode, (SpaceNode)): # 上一点为车位点 print("Error: last node and current node is space node ") lastNode = None continue elif isinstance(lastNode, (StreetNode)): # 上一点为马路点,则动作为入库 actions.append(["input", [lastNode, node]]) lastNode = None continue if isinstance(node, (StreetNode)): if isinstance(lastNode, (SpaceNode)): # 上一点为车位点, 出库 actions.append(["output", [lastNode, node]]) elif isinstance(lastNode, (StreetNode)): if navIndex < 0: actions.append(["nav", [lastNode]]) navIndex = len(actions) - 1 actions[navIndex][1].append(node) lastNode = node return actions def SwitchMoveMode(self, mode, wheelbase): if self.IsMainAgv() == False: print(" this agv is single can not switch mode") return False cmd = message.NavCmd() key = str(uuid.uuid4()) cmd.key = key cmd.action = 0 action = message.NewAction() action.type = 8 action.changedMode = mode if mode == 1: action.wheelbase = wheelbase cmd.newActions.add().CopyFrom(action) print(cmd) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def IsClampClosed(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): clamp = self.timedRobotStatu_.statu.agvStatu.clamp clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other return (clamp == 1 and clamp_other == 1) else: return self.timedRobotStatu_.statu.agvStatu.clamp == 1 return False def IsClampRunning(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): clamp = self.timedRobotStatu_.statu.agvStatu.clamp clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other return (clamp == 0 or clamp_other == 0) else: return self.timedRobotStatu_.statu.agvStatu.clamp == 0 return False def IsClampOpened(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): clamp = self.timedRobotStatu_.statu.agvStatu.clamp clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other return (clamp == 2 and clamp_other == 2) else: return self.timedRobotStatu_.statu.agvStatu.clamp == 2 return False def ClampClose(self): if self.IsClampClosed() == True: print("clamp closed") return True cmd = message.NavCmd() cmd.action = 0 key = str(uuid.uuid4()) cmd.key = (key) act = message.NewAction() act.type = 6 cmd.newActions.add().CopyFrom(act) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def ClampOpen(self): if self.IsClampOpened() == True: print("clamp opended") return True cmd = message.NavCmd() cmd.action = 0 key = str(uuid.uuid4()) cmd.key = (key) act = message.NewAction() act.type = 7 cmd.newActions.add().CopyFrom(act) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def IsLiferDowned(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): lifter = self.timedRobotStatu_.statu.agvStatu.lifter lifter_other = self.timedRobotStatu_.statu.agvStatu.lifter_other return (lifter == 2 and lifter_other == 2) else: return self.timedRobotStatu_.statu.agvStatu.lifter == 2 return False def IsLiferRunning(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): lifter = self.timedRobotStatu_.statu.agvStatu.lifter lifter_other = self.timedRobotStatu_.statu.agvStatu.lifter_other return (lifter == 0 and lifter_other == 0) else: return self.timedRobotStatu_.statu.agvStatu.lifter == 0 return False def IsLiferRose(self): if self.timedRobotStatu_.timeout() == False: if self.IsMainModeStatu(): lifter = self.timedRobotStatu_.statu.agvStatu.lifter lifter_other = self.timedRobotStatu_.statu.agvStatu.lifter_other return (lifter == 1 and lifter_other == 1) else: return self.timedRobotStatu_.statu.agvStatu.lifter == 1 return False def LiferRise(self): if self.IsLiferRose() == True: print("lifter has rose") return True cmd = message.NavCmd() cmd.action = 0 key = str(uuid.uuid4()) cmd.key = (key) act = message.NewAction() act.type = 9 cmd.newActions.add().CopyFrom(act) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def LiferDown(self): if self.IsLiferDowned() == True: print("lifter has downed") return True cmd = message.NavCmd() cmd.action = 0 key = str(uuid.uuid4()) cmd.key = (key) act = message.NewAction() act.type = 10 cmd.newActions.add().CopyFrom(act) channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Start(cmd) return True def CancelNavTask(self): cmd = message.NavCmd() cmd.action = 3 channel = grpc.insecure_channel(self.rpc_ipport_) stub = msg_rpc.NavExcutorStub(channel) response = stub.Cancel(cmd) print("client received: ", response) print(" Cancel task completed!!!") return True def IsManualTaskRunning(self): return self.manual_status != ManualOperationType.eReady def ManualTask(self, current_operation, step_acc=0, speed=0): if self.IsNavigating(): print("AGV is navigating!!!") # return False # slowly stop if current_operation == ManualOperationType.eReady and \ ManualOperationType.eReady < self.manual_status <= ManualOperationType.eRightMove: print("Manual_Unit_SlowlyStop") self.Manual_Unit_SlowlyStop(current_operation, speed) return True # if self.IsManualTaskRunning(): # return print("ManualTask | ManualTask is running...") self.manual_status = current_operation if ManualOperationType.eReady < current_operation <= ManualOperationType.eRightMove: cmd = message.ToAgvCmd() self.heat_ += 1 self.heat_ %= 255 cmd.H1 = self.heat_ if self.IsMainModeStatu(): cmd.M1 = 1 else: cmd.M1 = 0 if current_operation == ManualOperationType.eCounterclockwiseRotate: cmd.T1 = 1 cmd.V1 = 0 cmd.W1 = speed / 180 * math.pi * sigmoid(step_acc, 0, 1) if current_operation == ManualOperationType.eClockwiseRotate: cmd.T1 = 1 cmd.V1 = 0 cmd.W1 = -speed / 180 * math.pi * sigmoid(step_acc, 0, 1) if current_operation == ManualOperationType.eForwardMove: cmd.T1 = 3 cmd.V1 = speed * sigmoid(step_acc, 0, 1) cmd.W1 = 0 if current_operation == ManualOperationType.eBackwardMove: cmd.T1 = 3 cmd.V1 = -speed * sigmoid(step_acc, 0, 1) cmd.W1 = 0 if current_operation == ManualOperationType.eLeftMove: cmd.T1 = 2 cmd.V1 = speed * sigmoid(step_acc, 0, 1) cmd.W1 = 0 if current_operation == ManualOperationType.eRightMove: cmd.T1 = 2 cmd.V1 = -speed * sigmoid(step_acc, 0, 1) cmd.W1 = 0 cmd.V2 = cmd.V1 cmd.V3 = cmd.V1 cmd.W2 = cmd.W1 cmd.W3 = cmd.W1 cmd.L1 = self.L_ cmd.P1 = 0 cmd.D1 = 0.0 cmd.end = 1 self.manual_speed = [cmd.V1, cmd.W1] # 处理为json格式 cmd_dict = {} for field in cmd.DESCRIPTOR.fields: cmd_dict[field.name] = field.default_value for field, value in cmd.ListFields(): cmd_dict[field.name] = value cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':')) self.publish(self.speedCmdTopic, cmd_json) print(cmd_json) # channel = grpc.insecure_channel(self.rpc_ipport_) # stub = msg_rpc.NavExcutorStub(channel) # response = stub.ManualOperation(cmd_0) # print("client received: ", response) # self.manual_status = ManualOperationType.eReady # print("ManualOperation {op_type} task completed!!!".format(op_type=hex(current_operation))) time.sleep(0.05) return True elif ManualOperationType.eClampClose <= current_operation <= ManualOperationType.eClampOpen: self.Manual_Unit_Clamp(current_operation) return True elif ManualOperationType.eLifterRise <= current_operation <= ManualOperationType.eLifterDown: self.Manual_Unit_Lifter(current_operation) return True return False def Manual_Unit_Lifter(self, current_operation: ManualOperationType): print(current_operation) cmd = message.ToAgvCmd() self.heat_ += 1 self.heat_ %= 255 cmd.H1 = self.heat_ if self.IsMainModeStatu(): cmd.M1 = 1 else: cmd.M1 = 0 cmd.T1 = self.ManualOperationType2AgvCmdType(current_operation) # 处理为json格式 # jtf.MessageToJson(cmd) cmd_dict = {} for field in cmd.DESCRIPTOR.fields: cmd_dict[field.name] = field.default_value for field, value in cmd.ListFields(): cmd_dict[field.name] = value cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':')) self.publish(self.speedCmdTopic, cmd_json) print(cmd_json) # self.manual_status = ManualOperationType.eReady return True def Manual_Unit_Clamp(self, current_operation: ManualOperationType): print(current_operation) cmd = message.ToAgvCmd() self.heat_ += 1 self.heat_ %= 255 cmd.H1 = self.heat_ if self.IsMainModeStatu(): cmd.M1 = 1 else: cmd.M1 = 0 cmd.T1 = self.ManualOperationType2AgvCmdType(current_operation) # 处理为json格式 # jtf.MessageToJson(cmd) cmd_dict = {} for field in cmd.DESCRIPTOR.fields: cmd_dict[field.name] = field.default_value for field, value in cmd.ListFields(): cmd_dict[field.name] = value cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':')) self.publish(self.speedCmdTopic, cmd_json) print(cmd_json) # self.manual_status = ManualOperationType.eReady return True def ManualOperationType2AgvCmdType(self, mo_type: ManualOperationType): if mo_type == ManualOperationType.eLifterRise: return ActType.eLifterRise if mo_type == ManualOperationType.eLifterDown: return ActType.eLifterDown if mo_type == ManualOperationType.eClampClose: return ActType.eClampClose if mo_type == ManualOperationType.eClampOpen: return ActType.eClampOpen def Manual_Unit_SlowlyStop(self, current_operation: ManualOperationType, speed): step_sum = 10 step_acc = 1 speed_v = max(fabs(self.manual_speed[0]), 0.01) speed_w = max(fabs(self.manual_speed[1]), 1) while step_acc <= step_sum: if ManualOperationType.eReady < self.manual_status <= ManualOperationType.eRightMove: cmd = message.ToAgvCmd() self.heat_ += 1 self.heat_ %= 255 cmd.H1 = self.heat_ if self.IsMainModeStatu(): cmd.M1 = 1 else: cmd.M1 = 0 if self.manual_status == ManualOperationType.eCounterclockwiseRotate: cmd.T1 = 1 cmd.V1 = 0 cmd.W1 = speed_w / 180 * math.pi * (step_sum - step_acc) / step_sum if self.manual_status == ManualOperationType.eClockwiseRotate: cmd.T1 = 1 cmd.V1 = 0 cmd.W1 = -speed_w / 180 * math.pi * (step_sum - step_acc) / step_sum if self.manual_status == ManualOperationType.eForwardMove: cmd.T1 = 3 cmd.V1 = speed_v * (step_sum - step_acc) / step_sum cmd.W1 = 0 if self.manual_status == ManualOperationType.eBackwardMove: cmd.T1 = 3 cmd.V1 = -speed_v * (step_sum - step_acc) / step_sum cmd.W1 = 0 if self.manual_status == ManualOperationType.eLeftMove: cmd.T1 = 2 cmd.V1 = speed_v * (step_sum - step_acc) / step_sum cmd.W1 = 0 if self.manual_status == ManualOperationType.eRightMove: cmd.T1 = 2 cmd.V1 = -speed_v * (step_sum - step_acc) / step_sum cmd.W1 = 0 cmd.V2 = cmd.V1 cmd.V3 = cmd.V1 cmd.W2 = cmd.W1 cmd.W3 = cmd.W1 cmd.L1 = self.L_ cmd.end = 1 # 处理为json格式 cmd_dict = {} for field in cmd.DESCRIPTOR.fields: cmd_dict[field.name] = field.default_value for field, value in cmd.ListFields(): cmd_dict[field.name] = value cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':')) self.publish(self.speedCmdTopic, cmd_json) print(cmd_json) step_acc += 1 time.sleep(0.1) self.manual_status = ManualOperationType.eReady self.manual_speed = [0, 0] print("Manual_Unit_SlowlyStop completed.") return True # def GenerateActionMPC(self, beg_name, end_name, autoDirect, wheelbase=0, task_type="None"): # self.GeneratePath(beg_name, end_name) # actions = list() # # split_path = self.SplitPath(self.currentNavPathNodes_) # for action in split_path: # [type, nodes] = action # if type == "input": # [street, space] = nodes # protoSpace = self.generateProtoNode(space) # protoStreet = self.generateProtoNode(street) # act = message.NewAction() # act.type = 1 # act.wheelbase = wheelbase # act.spaceNode.CopyFrom(protoSpace) # act.streetNode.CopyFrom(protoStreet) # # actions.append(act) # if type == "output": # [space, street] = nodes # protoSpace = self.generateProtoNode(space) # protoStreet = self.generateProtoNode(street) # act = message.NewAction() # act.type = 2 # act.wheelbase = wheelbase # act.spaceNode.CopyFrom(protoSpace) # act.streetNode.CopyFrom(protoStreet) # # actions.append(act) # if type == "nav": # act = self.CreateNavPathNodesAction(nodes, autoDirect) # if act is None: # print("Nav cmd is None") # return False # else: # actions.append(act) # # return actions def SetSequence(self, s_type: int): self.current_sequence_ = s_type def ResetSequence(self): self.current_sequence_ = 0