123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- #!/usr/bin/env python
- """
- head_tracker.py - Version 1.1 2013-12-20
-
- Move the head to track a target published on the /roi topic.
-
- Created for the Pi Robot Project: http://www.pirobot.org
- Copyright (c) 2010 Patrick Goebel. All rights reserved.
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details at:
-
- http://www.gnu.org/licenses/gpl.html
- """
- import rospy
- from sensor_msgs.msg import JointState, RegionOfInterest, CameraInfo
- from dynamixel_controllers.srv import *
- from std_msgs.msg import Float64
- from math import radians
- import thread
- class HeadTracker():
- def __init__(self):
- rospy.init_node("head_tracker")
-
- rospy.on_shutdown(self.shutdown)
-
- rate = rospy.get_param("~rate", 20)
- r = rospy.Rate(rate)
- tick = 1.0 / rate
-
- # Keep the speed updates below about 5 Hz; otherwise the servos
- # can behave erratically.
- speed_update_rate = rospy.get_param("~speed_update_rate", 10)
- speed_update_interval = 1.0 / speed_update_rate
-
- # How big a change do we need in speed before we push an update
- # to the servos?
- self.speed_update_threshold = rospy.get_param("~speed_update_threshold", 0.01)
-
- # What are the names of the pan and tilt joints in the list of dynamixels?
- self.head_pan_joint = rospy.get_param('~head_pan_joint', 'head_pan_joint')
- self.head_tilt_joint = rospy.get_param('~head_tilt_joint', 'head_tilt_joint')
-
- self.joints = [self.head_pan_joint, self.head_tilt_joint]
-
- # Joint speeds are given in radians per second
- self.default_joint_speed = rospy.get_param('~default_joint_speed', 0.3)
- self.max_joint_speed = rospy.get_param('~max_joint_speed', 0.5)
- # How far ahead or behind the target (in radians) should we aim for?
- self.lead_target_angle = rospy.get_param('~lead_target_angle', 1.0)
-
- # The pan/tilt thresholds indicate what percentage of the image window
- # the ROI needs to be off-center before we make a movement
- self.pan_threshold = rospy.get_param("~pan_threshold", 0.05)
- self.tilt_threshold = rospy.get_param("~tilt_threshold", 0.05)
-
- # The gain_pan and gain_tilt parameter determine how responsive the
- # servo movements are. If these are set too high, oscillation can result.
- self.gain_pan = rospy.get_param("~gain_pan", 1.0)
- self.gain_tilt = rospy.get_param("~gain_tilt", 1.0)
-
- # Set limits on the pan and tilt angles
- self.max_pan = rospy.get_param("~max_pan", radians(145))
- self.min_pan = rospy.get_param("~min_pan", radians(-145))
- self.max_tilt = rospy.get_param("~max_tilt", radians(90))
- self.min_tilt = rospy.get_param("~min_tilt", radians(-90))
-
- # How long we are willing to wait (in seconds) for a target before re-centering the servos?
- self.recenter_timeout = rospy.get_param('~recenter_timeout', 5)
-
- # Monitor the joint states of the pan and tilt servos
- self.joint_state = JointState()
- rospy.Subscriber('joint_states', JointState, self.update_joint_state, queue_size=1)
-
- # Wait until we actually have joint state values
- while self.joint_state == JointState():
- rospy.sleep(1)
-
- # Initialize the servo services and publishers
- self.init_servos()
-
- # Center the pan and tilt servos at the start
- self.center_head_servos()
- # Set a flag to indicate when the target has been lost
- self.target_visible = False
-
- # Set a timer to determine how long a target is no longer visible
- target_lost_timer = 0.0
-
- # Set a timer to track when we do a speed update
- speed_update_timer = 0.0
-
- # Initialize the pan and tilt speeds to zero
- pan_speed = tilt_speed = 0.0
-
- # Get a lock for updating the self.move_cmd values
- self.lock = thread.allocate_lock()
-
- # Wait for messages on the three topics we need to monitor
- rospy.loginfo("Waiting for roi and camera_info topics.")
- rospy.wait_for_message('camera_info', CameraInfo)
- rospy.wait_for_message('joint_states', JointState)
- rospy.wait_for_message('roi', RegionOfInterest)
- # Subscribe to camera_info topics and set the callback
- self.image_width = self.image_height = 0
- rospy.Subscriber('camera_info', CameraInfo, self.get_camera_info, queue_size=1)
-
- # Wait until we actually have the camera data
- while self.image_width == 0 or self.image_height == 0:
- rospy.sleep(1)
-
- # Subscribe to roi topics and set the callback
- self.roi_subscriber = rospy.Subscriber('roi', RegionOfInterest, self.set_joint_cmd, queue_size=1)
-
- rospy.loginfo("Ready to track target.")
-
- while not rospy.is_shutdown():
- # Acquire the lock
- self.lock.acquire()
-
- try:
- # If we have lost the target, stop the servos
- if not self.target_visible:
- self.pan_speed = 0.0
- self.tilt_speed = 0.0
-
- # Keep track of how long the target is lost
- target_lost_timer += tick
- else:
- self.target_visible = False
- target_lost_timer = 0.0
-
- # If the target is lost long enough, re-center the servos
- if target_lost_timer > self.recenter_timeout:
- rospy.loginfo("Cannot find target.")
- self.center_head_servos()
- target_lost_timer = 0.0
- else:
- # Update the servo speeds at the appropriate interval
- if speed_update_timer > speed_update_interval:
- if abs(self.last_pan_speed - self.pan_speed) > self.speed_update_threshold:
- self.set_servo_speed(self.head_pan_joint, self.pan_speed)
- self.last_pan_speed = self.pan_speed
-
- if abs(self.last_tilt_speed - self.tilt_speed) > self.speed_update_threshold:
- self.set_servo_speed(self.head_tilt_joint, self.tilt_speed)
- self.last_tilt_speed = self.tilt_speed
-
- speed_update_timer = 0.0
-
- # Update the pan position
- if self.last_pan_position != self.pan_position:
- self.set_servo_position(self.head_pan_joint, self.pan_position)
- self.last_pan_position = self.pan_position
-
- # Update the tilt position
- if self.last_tilt_position != self.tilt_position:
- self.set_servo_position(self.head_tilt_joint, self.tilt_position)
- self.last_tilt_position = self.tilt_position
-
- speed_update_timer += tick
-
- finally:
- # Release the lock
- self.lock.release()
-
- r.sleep()
-
- def set_joint_cmd(self, msg):
- # Acquire the lock
- self.lock.acquire()
-
- try:
- # If we receive an ROI messages with 0 width or height, the target is not visible
- if msg.width == 0 or msg.height == 0:
- self.target_visible = False
- return
-
- # If the ROI stops updating this next statement will not happen
- self.target_visible = True
-
- # Compute the displacement of the ROI from the center of the image
- target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
- target_offset_y = msg.y_offset + msg.height / 2 - self.image_height / 2
-
- try:
- percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
- percent_offset_y = float(target_offset_y) / (float(self.image_height) / 2.0)
- except:
- percent_offset_x = 0
- percent_offset_y = 0
-
- # Set the target position ahead or behind the current position
- current_pan = self.joint_state.position[self.joint_state.name.index(self.head_pan_joint)]
-
- # Pan the camera only if the x target offset exceeds the threshold
- if abs(percent_offset_x) > self.pan_threshold:
- # Set the pan speed proportional to the target offset
- self.pan_speed = min(self.max_joint_speed, max(0, self.gain_pan * abs(percent_offset_x)))
-
- if target_offset_x > 0:
- self.pan_position = max(self.min_pan, current_pan - self.lead_target_angle)
- else:
- self.pan_position = min(self.max_pan, current_pan + self.lead_target_angle)
- else:
- self.pan_speed = 0
- self.pan_position = current_pan
-
- # Set the target position ahead or behind the current position
- current_tilt = self.joint_state.position[self.joint_state.name.index(self.head_tilt_joint)]
-
- # Tilt the camera only if the y target offset exceeds the threshold
- if abs(percent_offset_y) > self.tilt_threshold:
- # Set the tilt speed proportional to the target offset
- self.tilt_speed = min(self.max_joint_speed, max(0, self.gain_tilt * abs(percent_offset_y)))
-
- if target_offset_y < 0:
- self.tilt_position = max(self.min_tilt, current_tilt - self.lead_target_angle)
- else:
- self.tilt_position = min(self.max_tilt, current_tilt + self.lead_target_angle)
- else:
- self.tilt_speed = 0
- self.tilt_position = current_tilt
- finally:
- # Release the lock
- self.lock.release()
-
-
- def center_head_servos(self):
- rospy.loginfo("Centering servos.")
- self.servo_speed[self.head_pan_joint](self.default_joint_speed)
- self.servo_speed[self.head_tilt_joint](self.default_joint_speed)
-
- current_tilt = self.joint_state.position[self.joint_state.name.index(self.head_tilt_joint)]
- current_pan = self.joint_state.position[self.joint_state.name.index(self.head_pan_joint)]
- while abs(current_tilt) > 0.05 or abs(current_pan) > 0.05:
- self.servo_position[self.head_pan_joint].publish(0)
- self.servo_position[self.head_tilt_joint].publish(0)
-
- rospy.sleep(0.1)
-
- current_tilt = self.joint_state.position[self.joint_state.name.index(self.head_tilt_joint)]
- current_pan = self.joint_state.position[self.joint_state.name.index(self.head_pan_joint)]
- self.servo_speed[self.head_pan_joint](0.0)
- self.servo_speed[self.head_tilt_joint](0.0)
-
- def init_servos(self):
- # Create dictionaries to hold the speed, position and torque controllers
- self.servo_speed = dict()
- self.servo_position = dict()
- self.torque_enable = dict()
- # Connect to the set_speed services and define a position publisher for each servo
- rospy.loginfo("Waiting for joint controllers services...")
-
- for joint in sorted(self.joints):
- # The set_speed services
- set_speed_service = '/' + joint + '/set_speed'
- rospy.wait_for_service(set_speed_service)
- self.servo_speed[joint] = rospy.ServiceProxy(set_speed_service, SetSpeed, persistent=True)
- # Initialize the servo speed to the default_joint_speed
- self.servo_speed[joint](self.default_joint_speed)
- # The position controllers
- self.servo_position[joint] = rospy.Publisher('/' + joint + '/command', Float64, queue_size=5)
-
- # A service to enable/disable servo torque
- torque_enable = '/' + joint + '/torque_enable'
- rospy.wait_for_service(torque_enable)
- self.torque_enable[joint] = rospy.ServiceProxy(torque_enable, TorqueEnable)
- self.torque_enable[joint](False)
-
- self.pan_position = 0
- self.tilt_position = 0
- self.pan_speed = 0
- self.tilt_speed = 0
-
- self.last_pan_position = 0
- self.last_tilt_position = 0
- self.last_tilt_speed = 0
- self.last_pan_speed = 0
-
- def set_servo_speed(self, servo, speed):
- # Guard against a speed of exactly zero which means
- # "move as fast as you can" to a Dynamixel servo.
- if speed == 0:
- speed = 0.01
- self.servo_speed[servo](speed)
-
- def set_servo_position(self, servo, position):
- self.servo_position[servo].publish(position)
-
- def update_joint_state(self, msg):
- self.joint_state = msg
-
- def get_camera_info(self, msg):
- self.image_width = msg.width
- self.image_height = msg.height
-
- def shutdown(self):
- rospy.loginfo("Shutting down head tracking node.")
-
- # Turn off updates from the /roi subscriber
- try:
- self.roi_subscriber.unregister()
- except:
- pass
-
- # Center the servos
- self.center_head_servos()
-
- rospy.sleep(2)
-
- # Relax all servos to give them a rest.
- rospy.loginfo("Relaxing pan and tilt servos.")
-
- for servo in self.joints:
- self.torque_enable[servo](False)
-
-
- if __name__ == '__main__':
- try:
- HeadTracker()
- rospy.spin()
- except rospy.ROSInterruptException:
- rospy.loginfo("Head tracking node terminated.")
|