123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875 |
- import copy
- import enum
- import time
- from numpy import fabs
- import message_pb2 as message
- import grpc
- import message_pb2_grpc as msg_rpc
- from ManualOperationWidget import ManualOperationType
- from custom_define import RobotName, ActType
- from mqtt_async import MqttAsync
- import google.protobuf.json_format as jtf
- import json
- from dijkstra.Map import SpaceNode, StreetNode, MapManager, DijikstraMap
- import uuid
- import random
- import math
- import Count
- from sigmoid import sigmoid
- class TimeStatu:
- def __init__(self, statu=None, timeout=3):
- self.statu = statu
- self.time = time.time()
- self.timeout_s = timeout
- def timeout(self):
- tm = time.time()
- return tm - self.time > self.timeout_s or self.statu == None
- class Robot(MqttAsync):
- def __init__(self, rpc_ipport, name=""):
- MqttAsync.__init__(self)
- self.test_speed = 0
- self.timedRobotStatu_ = TimeStatu(message.RobotStatu, 0)
- self.timedNavStatu_ = TimeStatu(message.NavStatu, 0)
- self.dataTopic_ = {}
- self.rpc_ipport_ = rpc_ipport
- self.messageArrivedSignal = None
- self.currentNavData_ = None
- self.navCmdTopic_ = None
- if (name == RobotName.strAGVMain):
- self.speedCmdTopic = "monitor_child/speedcmd"
- else:
- self.speedCmdTopic = "monitor_child/speedcmd"
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- # 当前正在执行的流程默认0,存车1, 取车2
- self.current_sequence_: int = 0
- self.pathColor_ = [1, 1, 0]
- self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色
- self.name_ = name
- self.begId_ = "------"
- self.targetId_ = "------"
- self.autoTest_ = False
- self.l_ = 0.8 # 轮长
- self.L_ = 2.9 # 轴距
- self.W_ = 2.5 # 宽
- self.heat_ = 0 # 心跳
- self.manual_status = ManualOperationType.eReady # 正在进行的手动操作
- self.manual_speed = [0.0, 0]
- def Color(self):
- if self.IsMainModeStatu():
- return [0, 0, 0]
- return self.robotColor_
- def IsMainAgv(self):
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.main_agv == True:
- return True
- return False
- def IsMainModeStatu(self):
- if self.IsMainAgv():
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.move_mode == 1:
- return True
- return False
- def SetSubDataTopic(self, match: dict, messageSignal=None):
- self.dataTopic_ = match
- self.messageArrivedSignal = messageSignal
- for item in match.items():
- [topic, _] = item
- self.subscribe(topic, self.on_message)
- def SetCmdTopic(self, topic):
- self.navCmdTopic_ = topic
- def SetColor(self, pathColor, robotColor):
- self.pathColor_ = pathColor
- self.robotColor_ = robotColor
- def ResetPose(self, agv: message.RobotStatu):
- self.timedRobotStatu_ = TimeStatu(agv, 2)
- def ResetNavStatu(self, statu: message.NavStatu):
- self.timedNavStatu_ = TimeStatu(statu, 2)
- def on_message(self, client, userdata, msg):
- topic = msg.topic
- if self.dataTopic_.get(topic) is not None:
- dtype = self.dataTopic_[topic]
- if dtype == message.RobotStatu.__name__:
- proto = message.RobotStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetPose(proto)
- if dtype == message.NavStatu.__name__:
- self.last_running = self.IsNavigating()
- proto = message.NavStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetNavStatu(proto)
- if self.last_running == True and self.IsNavigating() == False:
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- if self.messageArrivedSignal is not None:
- self.messageArrivedSignal()
- def MpcSelectTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.selected_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def MpcPredictTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.predict_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def Connected(self):
- return self.timedNavStatu_.timeout() == False and self.timedRobotStatu_.timeout() == False
- def IsNavigating(self):
- return not self.ActionType() == ActType.eReady
- '''if self.timedNavStatu_.timeout() == False:
- key = self.timedNavStatu_.statu.key
- if key == "" or key == None:
- return False
- return True'''
- def GeneratePath(self, begID, endID, mapName):
- self.currentNavPathNodes_ = MapManager().GetShortestPath(mapName, begID, endID)
- # if task_type == "DoubleAGV":
- # print("AdjustPath------------------")
- # if self.IsMainModeStatu() is False:
- # print("原始路径:")
- # for node in self.currentNavPathNodes_:
- # print(node.id_, node.y_)
- # self.currentNavPathNodes_ = self.AdjustPath(self.currentNavPathNodes_)
- # print("修改路径:")
- # for node in self.currentNavPathNodes_:
- # print(node.id_, node.y_)
- self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
- if self.currentNavPathNodes_ is not None and self.currentNavPath_ is not None:
- self.begId_ = begID
- self.targetId_ = endID
- return True
- return False
- def PositionId(self):
- if self.timedRobotStatu_.timeout():
- return None
- x = self.timedRobotStatu_.statu.x
- y = self.timedRobotStatu_.statu.y
- djks_map = MapManager()
- [label, pt] = djks_map.findNeastNode([x, y])
- return [label, pt]
- '''
- 阻塞直到导航完成
- '''
- def Navigatting(self, begId, targetId, autoDirect, wheelbase=0, task_type: int = 0):
- if wheelbase < 2.4 or wheelbase > 4:
- print("wheelbase is out of range!\n")
- return False
- print("nav")
- self.SetSequence(task_type)
- # self.ExecPathNodes(autoDirect)
- if self.ExecNavtask(autoDirect, wheelbase) == False:
- self.ResetSequence()
- return False
- # self.ResetSequence()
- if self.IsMainModeStatu():
- Count.TestCount().loadCountAdd()
- else:
- Count.TestCount().singleCountAdd()
- return True
- def AutoTestNavClamp(self, begId, targetId):
- beg = begId
- target = targetId
- while self.autoTest_:
- if self.TestNavClampOnce(beg, target) == False:
- print(" quit auto Test")
- break
- print("1111")
- posInfo = self.PositionId()
- if posInfo is not None:
- [label, pt] = posInfo
- beg = label
- print("beg", beg)
- node = MapManager().GetVertex(beg)
- if node is not None:
- if isinstance(node, (SpaceNode)):
- target = "O"
- if isinstance(node, (StreetNode)):
- end_nodes = ["E", "B", "C", "D"]
- id = random.randint(0, 1000) % 4
- target = end_nodes[id]
- print("3333", target)
- print(" Next nav clamp cmd,%s to %s" % (beg, target))
- def ActionType(self):
- if self.timedNavStatu_.timeout() == False:
- runningStatu = self.timedNavStatu_.statu
- if runningStatu.statu == 0:
- return ActType.eReady
- if runningStatu.statu == 1:
- return ActType.eRotation
- if runningStatu.statu == 2:
- return ActType.eHorizon
- if runningStatu.statu == 3:
- return ActType.eVertical
- if runningStatu.statu == 4:
- return ActType.eMPC
- if runningStatu.statu == 5:
- return ActType.eClampClose
- if runningStatu.statu == 6:
- return ActType.eClampOpen
- if runningStatu.statu == 7:
- return ActType.eLifterRise
- if runningStatu.statu == 8:
- return ActType.eLifterDown
- else:
- return ActType.eReady
- @staticmethod
- def generateProtoNode(node):
- protoNode = message.PathNode()
- protoNode.x = node.x_
- protoNode.y = node.y_
- protoNode.id = node.id_
- return protoNode
- def ExecNavtask(self, autoDirect, wheelbase=0):
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = key
- cmd.action = 0
- cmd.sequence = self.current_sequence_
- if len(self.currentNavPathNodes_)==1:
- self.currentNavPathNodes_.append(self.currentNavPathNodes_[0])
- actions = self.SplitPath(self.currentNavPathNodes_)
- for action in actions:
- [type, nodes] = action
- if type == "input":
- [street, space] = nodes
- protoSpace = self.generateProtoNode(space)
- protoStreet = self.generateProtoNode(street)
- act = message.NewAction()
- act.type = 1
- act.wheelbase = wheelbase
- act.spaceNode.CopyFrom(protoSpace)
- act.streetNode.CopyFrom(protoStreet)
- cmd.newActions.add().CopyFrom(act)
- if type == "output":
- [space, street] = nodes
- protoSpace = self.generateProtoNode(space)
- protoStreet = self.generateProtoNode(street)
- act = message.NewAction()
- act.type = 2
- act.wheelbase = wheelbase
- act.spaceNode.CopyFrom(protoSpace)
- act.streetNode.CopyFrom(protoStreet)
- cmd.newActions.add().CopyFrom(act)
- if type == "nav":
- action = self.CreateNavPathNodesAction(nodes, autoDirect)
- if action is None:
- print("Nav cmd is None")
- return False
- else:
- cmd.newActions.add().CopyFrom(action)
- print(cmd)
- if cmd is None:
- print("Nav cmd is None")
- return False
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- print("client received: ", response)
- if not response.ret == 0:
- print("nav failed :%s" % (response.info))
- return False
- else:
- print("nav completed !!!")
- return True
- def CreateNavPathNodesAction(self, path, autoDirect):
- # if path is not None and self.timedRobotStatu_.timeout() == False:
- if 1:
- for node in path:
- if isinstance(node, (SpaceNode)):
- print(" nodes must all street node")
- return None
- statu = self.timedRobotStatu_.statu
- # if statu is not None:
- if 1:
- newAction = message.NewAction()
- size = len(path)
- for i in range(size):
- node = path[i]
- pathNode = message.PathNode()
- pathNode.x = node.x_
- pathNode.y = node.y_
- pathNode.id = node.id_
- newAction.type = 4
- if autoDirect:
- newAction.type = 3
- newAction.pathNodes.add().CopyFrom(pathNode)
- return newAction
- else:
- print("statu is None")
- else:
- print("current path is none")
- return None
- def ExecPathNodes(self, autoDirect):
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return False
- if not self.ActionType() == ActType.eReady:
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return False
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = key
- cmd.action = 0
- action = self.CreateNavPathNodesAction(self.currentNavPathNodes_, autoDirect)
- if action is None:
- print("Nav cmd is None")
- return False
- cmd.newActions.add().CopyFrom(action)
- print(cmd)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- '''
- 根据AGV单车的起始位置调整路径
- '''
- def AdjustPath(self, path):
- adjusted_path = copy.deepcopy(path)
- y_offset = 1.2
- dy = 0
- # if self.IsMainAgv() or self.name_ == "AgvMain":
- if self.name_ == RobotName.strAGVMain:
- dy = 1
- # if not self.IsMainAgv() or self == "AGV2":
- if self.name_ == RobotName.strAGV2:
- dy = -1
- print("dy:", dy)
- for node in adjusted_path:
- if isinstance(node, StreetNode):
- if dy > 0:
- node.y_ += y_offset
- elif dy < 0:
- node.y_ -= y_offset
- else:
- pass
- return adjusted_path
- '''
- 拆分路径,生成动作集,input:入库动作,nav:导航动作,output:出库动作
- '''
- def SplitPath(self, path):
- space_count = 0
- for i in range(len(path)):
- if isinstance(path[i], (SpaceNode)):
- space_count += 1
- if i == 0 or i == len(path) - 1:
- pass
- else:
- print("Error: space node is not first/last of path")
- return None
- actions = []
- lastNode = None
- navIndex = -1
- for node in path:
- if lastNode == None:
- lastNode = node
- continue
- if isinstance(node, (SpaceNode)): # 当前点是车位点
- if isinstance(lastNode, (SpaceNode)): # 上一点为车位点
- print("Error: last node and current node is space node ")
- lastNode = None
- continue
- elif isinstance(lastNode, (StreetNode)): # 上一点为马路点,则动作为入库
- actions.append(["input", [lastNode, node]])
- lastNode = None
- continue
- if isinstance(node, (StreetNode)):
- if isinstance(lastNode, (SpaceNode)): # 上一点为车位点, 出库
- actions.append(["output", [lastNode, node]])
- elif isinstance(lastNode, (StreetNode)):
- if navIndex < 0:
- actions.append(["nav", [lastNode]])
- navIndex = len(actions) - 1
- actions[navIndex][1].append(node)
- lastNode = node
- return actions
- def SwitchMoveMode(self, mode, wheelbase):
- if self.IsMainAgv() == False:
- print(" this agv is single can not switch mode")
- return False
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = key
- cmd.action = 0
- action = message.NewAction()
- action.type = 8
- action.changedMode = mode
- if mode == 1:
- action.wheelbase = wheelbase
- cmd.newActions.add().CopyFrom(action)
- print(cmd)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def IsClampClosed(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 1 and clamp_other == 1)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 1
- return False
- def IsClampRunning(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 0 or clamp_other == 0)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 0
- return False
- def IsClampOpened(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 2 and clamp_other == 2)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 2
- return False
- def ClampClose(self):
- if self.IsClampClosed() == True:
- print("clamp closed")
- return True
- cmd = message.NavCmd()
- cmd.action = 0
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.NewAction()
- act.type = 6
- cmd.newActions.add().CopyFrom(act)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def ClampOpen(self):
- if self.IsClampOpened() == True:
- print("clamp opended")
- return True
- cmd = message.NavCmd()
- cmd.action = 0
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.NewAction()
- act.type = 7
- cmd.newActions.add().CopyFrom(act)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def IsLiferDowned(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- lifter = self.timedRobotStatu_.statu.agvStatu.lifter
- lifter_other = self.timedRobotStatu_.statu.agvStatu.lifter_other
- return (lifter == 2 and lifter_other == 2)
- else:
- return self.timedRobotStatu_.statu.agvStatu.lifter == 2
- return False
- def IsLiferRunning(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- lifter = self.timedRobotStatu_.statu.agvStatu.lifter
- lifter_other = self.timedRobotStatu_.statu.agvStatu.lifter_other
- return (lifter == 0 and lifter_other == 0)
- else:
- return self.timedRobotStatu_.statu.agvStatu.lifter == 0
- return False
- def IsLiferRose(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- lifter = self.timedRobotStatu_.statu.agvStatu.lifter
- lifter_other = self.timedRobotStatu_.statu.agvStatu.lifter_other
- return (lifter == 1 and lifter_other == 1)
- else:
- return self.timedRobotStatu_.statu.agvStatu.lifter == 1
- return False
- def LiferRise(self):
- if self.IsLiferRose() == True:
- print("lifter has rose")
- return True
- cmd = message.NavCmd()
- cmd.action = 0
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.NewAction()
- act.type = 9
- cmd.newActions.add().CopyFrom(act)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def LiferDown(self):
- if self.IsLiferDowned() == True:
- print("lifter has downed")
- return True
- cmd = message.NavCmd()
- cmd.action = 0
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.NewAction()
- act.type = 10
- cmd.newActions.add().CopyFrom(act)
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Start(cmd)
- return True
- def CancelNavTask(self):
- cmd = message.NavCmd()
- cmd.action = 3
- channel = grpc.insecure_channel(self.rpc_ipport_)
- stub = msg_rpc.NavExcutorStub(channel)
- response = stub.Cancel(cmd)
- print("client received: ", response)
- print(" Cancel task completed!!!")
- return True
- def IsManualTaskRunning(self):
- return self.manual_status != ManualOperationType.eReady
- def ManualTask(self, current_operation, step_acc=0, speed=0):
- if self.IsNavigating():
- print("AGV is navigating!!!")
- # return False
- # slowly stop
- if current_operation == ManualOperationType.eReady and \
- ManualOperationType.eReady < self.manual_status <= ManualOperationType.eRightMove:
- print("Manual_Unit_SlowlyStop")
- self.Manual_Unit_SlowlyStop(current_operation, speed)
- return True
- # if self.IsManualTaskRunning():
- # return print("ManualTask | ManualTask is running...")
- self.manual_status = current_operation
- if ManualOperationType.eReady < current_operation <= ManualOperationType.eRightMove:
- cmd = message.ToAgvCmd()
- self.heat_ += 1
- self.heat_ %= 255
- cmd.H1 = self.heat_
- if self.IsMainModeStatu():
- cmd.M1 = 1
- else:
- cmd.M1 = 0
- if current_operation == ManualOperationType.eCounterclockwiseRotate:
- cmd.T1 = 1
- cmd.V1 = 0
- cmd.W1 = speed / 180 * math.pi * sigmoid(step_acc, 0, 1)
- if current_operation == ManualOperationType.eClockwiseRotate:
- cmd.T1 = 1
- cmd.V1 = 0
- cmd.W1 = -speed / 180 * math.pi * sigmoid(step_acc, 0, 1)
- if current_operation == ManualOperationType.eForwardMove:
- cmd.T1 = 3
- cmd.V1 = speed * sigmoid(step_acc, 0, 1)
- cmd.W1 = 0
- if current_operation == ManualOperationType.eBackwardMove:
- cmd.T1 = 3
- cmd.V1 = -speed * sigmoid(step_acc, 0, 1)
- cmd.W1 = 0
- if current_operation == ManualOperationType.eLeftMove:
- cmd.T1 = 2
- cmd.V1 = speed * sigmoid(step_acc, 0, 1)
- cmd.W1 = 0
- if current_operation == ManualOperationType.eRightMove:
- cmd.T1 = 2
- cmd.V1 = -speed * sigmoid(step_acc, 0, 1)
- cmd.W1 = 0
- cmd.V2 = cmd.V1
- cmd.V3 = cmd.V1
- cmd.W2 = cmd.W1
- cmd.W3 = cmd.W1
- cmd.L1 = self.L_
- cmd.P1 = 0
- cmd.D1 = 0.0
- cmd.end = 1
- self.manual_speed = [cmd.V1, cmd.W1]
- # 处理为json格式
- cmd_dict = {}
- for field in cmd.DESCRIPTOR.fields:
- cmd_dict[field.name] = field.default_value
- for field, value in cmd.ListFields():
- cmd_dict[field.name] = value
- cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':'))
- self.publish(self.speedCmdTopic, cmd_json)
- print(cmd_json)
- # channel = grpc.insecure_channel(self.rpc_ipport_)
- # stub = msg_rpc.NavExcutorStub(channel)
- # response = stub.ManualOperation(cmd_0)
- # print("client received: ", response)
- # self.manual_status = ManualOperationType.eReady
- # print("ManualOperation {op_type} task completed!!!".format(op_type=hex(current_operation)))
- time.sleep(0.05)
- return True
- elif ManualOperationType.eClampClose <= current_operation <= ManualOperationType.eClampOpen:
- self.Manual_Unit_Clamp(current_operation)
- return True
- elif ManualOperationType.eLifterRise <= current_operation <= ManualOperationType.eLifterDown:
- self.Manual_Unit_Lifter(current_operation)
- return True
- return False
- def Manual_Unit_Lifter(self, current_operation: ManualOperationType):
- print(current_operation)
- cmd = message.ToAgvCmd()
- self.heat_ += 1
- self.heat_ %= 255
- cmd.H1 = self.heat_
- if self.IsMainModeStatu():
- cmd.M1 = 1
- else:
- cmd.M1 = 0
- cmd.T1 = self.ManualOperationType2AgvCmdType(current_operation)
- # 处理为json格式 # jtf.MessageToJson(cmd)
- cmd_dict = {}
- for field in cmd.DESCRIPTOR.fields:
- cmd_dict[field.name] = field.default_value
- for field, value in cmd.ListFields():
- cmd_dict[field.name] = value
- cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':'))
- self.publish(self.speedCmdTopic, cmd_json)
- print(cmd_json)
- # self.manual_status = ManualOperationType.eReady
- return True
- def Manual_Unit_Clamp(self, current_operation: ManualOperationType):
- print(current_operation)
- cmd = message.ToAgvCmd()
- self.heat_ += 1
- self.heat_ %= 255
- cmd.H1 = self.heat_
- if self.IsMainModeStatu():
- cmd.M1 = 1
- else:
- cmd.M1 = 0
- cmd.T1 = self.ManualOperationType2AgvCmdType(current_operation)
- # 处理为json格式 # jtf.MessageToJson(cmd)
- cmd_dict = {}
- for field in cmd.DESCRIPTOR.fields:
- cmd_dict[field.name] = field.default_value
- for field, value in cmd.ListFields():
- cmd_dict[field.name] = value
- cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':'))
- self.publish(self.speedCmdTopic, cmd_json)
- print(cmd_json)
- # self.manual_status = ManualOperationType.eReady
- return True
- def ManualOperationType2AgvCmdType(self, mo_type: ManualOperationType):
- if mo_type == ManualOperationType.eLifterRise:
- return ActType.eLifterRise
- if mo_type == ManualOperationType.eLifterDown:
- return ActType.eLifterDown
- if mo_type == ManualOperationType.eClampClose:
- return ActType.eClampClose
- if mo_type == ManualOperationType.eClampOpen:
- return ActType.eClampOpen
- def Manual_Unit_SlowlyStop(self, current_operation: ManualOperationType, speed):
- step_sum = 10
- step_acc = 1
- speed_v = max(fabs(self.manual_speed[0]), 0.01)
- speed_w = max(fabs(self.manual_speed[1]), 1)
- while step_acc <= step_sum:
- if ManualOperationType.eReady < self.manual_status <= ManualOperationType.eRightMove:
- cmd = message.ToAgvCmd()
- self.heat_ += 1
- self.heat_ %= 255
- cmd.H1 = self.heat_
- if self.IsMainModeStatu():
- cmd.M1 = 1
- else:
- cmd.M1 = 0
- if self.manual_status == ManualOperationType.eCounterclockwiseRotate:
- cmd.T1 = 1
- cmd.V1 = 0
- cmd.W1 = speed_w / 180 * math.pi * (step_sum - step_acc) / step_sum
- if self.manual_status == ManualOperationType.eClockwiseRotate:
- cmd.T1 = 1
- cmd.V1 = 0
- cmd.W1 = -speed_w / 180 * math.pi * (step_sum - step_acc) / step_sum
- if self.manual_status == ManualOperationType.eForwardMove:
- cmd.T1 = 3
- cmd.V1 = speed_v * (step_sum - step_acc) / step_sum
- cmd.W1 = 0
- if self.manual_status == ManualOperationType.eBackwardMove:
- cmd.T1 = 3
- cmd.V1 = -speed_v * (step_sum - step_acc) / step_sum
- cmd.W1 = 0
- if self.manual_status == ManualOperationType.eLeftMove:
- cmd.T1 = 2
- cmd.V1 = speed_v * (step_sum - step_acc) / step_sum
- cmd.W1 = 0
- if self.manual_status == ManualOperationType.eRightMove:
- cmd.T1 = 2
- cmd.V1 = -speed_v * (step_sum - step_acc) / step_sum
- cmd.W1 = 0
- cmd.V2 = cmd.V1
- cmd.V3 = cmd.V1
- cmd.W2 = cmd.W1
- cmd.W3 = cmd.W1
- cmd.L1 = self.L_
- cmd.end = 1
- # 处理为json格式
- cmd_dict = {}
- for field in cmd.DESCRIPTOR.fields:
- cmd_dict[field.name] = field.default_value
- for field, value in cmd.ListFields():
- cmd_dict[field.name] = value
- cmd_json = json.dumps(cmd_dict, indent=None, separators=(',', ':'))
- self.publish(self.speedCmdTopic, cmd_json)
- print(cmd_json)
- step_acc += 1
- time.sleep(0.1)
- self.manual_status = ManualOperationType.eReady
- self.manual_speed = [0, 0]
- print("Manual_Unit_SlowlyStop completed.")
- return True
- # def GenerateActionMPC(self, beg_name, end_name, autoDirect, wheelbase=0, task_type="None"):
- # self.GeneratePath(beg_name, end_name)
- # actions = list()
- #
- # split_path = self.SplitPath(self.currentNavPathNodes_)
- # for action in split_path:
- # [type, nodes] = action
- # if type == "input":
- # [street, space] = nodes
- # protoSpace = self.generateProtoNode(space)
- # protoStreet = self.generateProtoNode(street)
- # act = message.NewAction()
- # act.type = 1
- # act.wheelbase = wheelbase
- # act.spaceNode.CopyFrom(protoSpace)
- # act.streetNode.CopyFrom(protoStreet)
- #
- # actions.append(act)
- # if type == "output":
- # [space, street] = nodes
- # protoSpace = self.generateProtoNode(space)
- # protoStreet = self.generateProtoNode(street)
- # act = message.NewAction()
- # act.type = 2
- # act.wheelbase = wheelbase
- # act.spaceNode.CopyFrom(protoSpace)
- # act.streetNode.CopyFrom(protoStreet)
- #
- # actions.append(act)
- # if type == "nav":
- # act = self.CreateNavPathNodesAction(nodes, autoDirect)
- # if act is None:
- # print("Nav cmd is None")
- # return False
- # else:
- # actions.append(act)
- #
- # return actions
- def SetSequence(self, s_type: int):
- self.current_sequence_ = s_type
- def ResetSequence(self):
- self.current_sequence_ = 0
|