worker.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. """This is the script executed by workers of the quality control pipline."""
  2. import argparse
  3. import datetime
  4. from os.path import basename
  5. from pprint import pprint
  6. import re
  7. import subprocess
  8. from google.cloud import bigquery
  9. from google.cloud import datastore
  10. class Pattern(object):
  11. """Defines a pattern for regular expression matching."""
  12. def __init__(self, pattern):
  13. self.regex = re.compile(pattern, re.MULTILINE)
  14. def extract(self, text):
  15. """Returns a dictionary of named capture groups to extracted output.
  16. Args:
  17. text: input to parse
  18. Returns an empty dict if no match was found.
  19. """
  20. match = self.regex.search(text)
  21. if match is None:
  22. return {}
  23. return match.groupdict()
  24. def extract_last_occurence(self, text):
  25. """Returns tuple of extracted outputs.
  26. Args:
  27. text: input to parse
  28. Returns the information extracted from the last match. Returns
  29. None if no match was found.
  30. """
  31. matches = self.regex.findall(text)
  32. if not matches:
  33. return None
  34. return matches[-1]
  35. # BigQuery table schema
  36. SCHEMA = [
  37. bigquery.SchemaField('date', 'DATE'),
  38. bigquery.SchemaField('commit_sha1', 'STRING'),
  39. bigquery.SchemaField('job_id', 'INTEGER'),
  40. bigquery.SchemaField('rosbag', 'STRING'),
  41. bigquery.SchemaField('user_time_secs', 'FLOAT'),
  42. bigquery.SchemaField('system_time_secs', 'FLOAT'),
  43. bigquery.SchemaField('wall_time_secs', 'FLOAT'),
  44. bigquery.SchemaField('max_set_size_kbytes', 'INTEGER'),
  45. bigquery.SchemaField('constraints_count', 'INTEGER'),
  46. bigquery.SchemaField('constraints_score_minimum', 'FLOAT'),
  47. bigquery.SchemaField('constraints_score_maximum', 'FLOAT'),
  48. bigquery.SchemaField('constraints_score_mean', 'FLOAT'),
  49. bigquery.SchemaField('ground_truth_abs_trans_err', 'FLOAT'),
  50. bigquery.SchemaField('ground_truth_abs_trans_err_dev', 'FLOAT'),
  51. bigquery.SchemaField('ground_truth_sqr_trans_err', 'FLOAT'),
  52. bigquery.SchemaField('ground_truth_sqr_trans_err_dev', 'FLOAT'),
  53. bigquery.SchemaField('ground_truth_abs_rot_err', 'FLOAT'),
  54. bigquery.SchemaField('ground_truth_abs_rot_err_dev', 'FLOAT'),
  55. bigquery.SchemaField('ground_truth_sqr_rot_err', 'FLOAT'),
  56. bigquery.SchemaField('ground_truth_sqr_rot_err_dev', 'FLOAT')
  57. ]
  58. # Pattern matchers for the various fields of the '/usr/bin/time -v' output
  59. USER_TIME_PATTERN = Pattern(
  60. r'^\s*User time \(seconds\): (?P<user_time>\d+.\d+|\d+)')
  61. SYSTEM_TIME_PATTERN = Pattern(
  62. r'^\s*System time \(seconds\): (?P<system_time>\d+.\d+|\d+)')
  63. WALL_TIME_PATTERN = Pattern(
  64. r'^\s*Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): '
  65. r'((?P<hours>\d{1,2}):|)(?P<minutes>\d{1,2}):(?P<seconds>\d{2}\.\d{2})')
  66. MAX_RES_SET_SIZE_PATTERN = Pattern(
  67. r'^\s*Maximum resident set size \(kbytes\): (?P<max_set_size>\d+)')
  68. CONSTRAINT_STATS_PATTERN = Pattern(
  69. r'Score histogram:[\n\r]+'
  70. r'Count:\s+(?P<constraints_count>\d+)\s+'
  71. r'Min:\s+(?P<constraints_score_min>\d+\.\d+)\s+'
  72. r'Max:\s+(?P<constraints_score_max>\d+\.\d+)\s+'
  73. r'Mean:\s+(?P<constraints_score_mean>\d+\.\d+)')
  74. GROUND_TRUTH_STATS_PATTERN = Pattern(
  75. r'Result:[\n\r]+'
  76. r'Abs translational error (?P<abs_trans_err>\d+\.\d+) '
  77. r'\+/- (?P<abs_trans_err_dev>\d+\.\d+) m[\n\r]+'
  78. r'Sqr translational error (?P<sqr_trans_err>\d+\.\d+) '
  79. r'\+/- (?P<sqr_trans_err_dev>\d+\.\d+) m\^2[\n\r]+'
  80. r'Abs rotational error (?P<abs_rot_err>\d+\.\d+) '
  81. r'\+/- (?P<abs_rot_err_dev>\d+\.\d+) deg[\n\r]+'
  82. r'Sqr rotational error (?P<sqr_rot_err>\d+\.\d+) '
  83. r'\+/- (?P<sqr_rot_err_dev>\d+\.\d+) deg\^2')
  84. # Pattern matcher for extracting the HEAD commit SHA-1 hash.
  85. GIT_SHA1_PATTERN = Pattern(r'^(?P<sha1>[0-9a-f]{40})\s+HEAD')
  86. def get_head_git_sha1():
  87. """Returns the SHA-1 hash of the commit tagged HEAD."""
  88. output = subprocess.check_output([
  89. 'git', 'ls-remote',
  90. 'https://github.com/googlecartographer/cartographer.git'
  91. ])
  92. parsed = GIT_SHA1_PATTERN.extract(output)
  93. return parsed['sha1']
  94. def extract_stats(inp):
  95. """Returns a dictionary of stats."""
  96. result = {}
  97. parsed = USER_TIME_PATTERN.extract(inp)
  98. result['user_time_secs'] = float(parsed['user_time'])
  99. parsed = SYSTEM_TIME_PATTERN.extract(inp)
  100. result['system_time_secs'] = float(parsed['system_time'])
  101. parsed = WALL_TIME_PATTERN.extract(inp)
  102. result['wall_time_secs'] = float(parsed['hours'] or 0.) * 3600 + float(
  103. parsed['minutes']) * 60 + float(parsed['seconds'])
  104. parsed = MAX_RES_SET_SIZE_PATTERN.extract(inp)
  105. result['max_set_size_kbytes'] = int(parsed['max_set_size'])
  106. parsed = CONSTRAINT_STATS_PATTERN.extract_last_occurence(inp)
  107. print parsed
  108. result['constraints_count'] = int(parsed[0])
  109. result['constraints_score_min'] = float(parsed[1])
  110. result['constraints_score_max'] = float(parsed[2])
  111. result['constraints_score_mean'] = float(parsed[3])
  112. return result
  113. def extract_ground_truth_stats(input_text):
  114. """Returns a dictionary of ground truth stats."""
  115. result = {}
  116. parsed = GROUND_TRUTH_STATS_PATTERN.extract(input_text)
  117. for name in ('abs_trans_err', 'sqr_trans_err', 'abs_rot_err', 'sqr_rot_err'):
  118. result['ground_truth_{}'.format(name)] = float(parsed[name])
  119. result['ground_truth_{}_dev'.format(name)] = float(
  120. parsed['{}_dev'.format(name)])
  121. return result
  122. def retrieve_entity(datastore_client, kind, identifier):
  123. """Convenience function for Datastore entity retrieval."""
  124. key = datastore_client.key(kind, identifier)
  125. return datastore_client.get(key)
  126. def create_job_selector(worker_id, num_workers):
  127. """Constructs a round-robin job selector."""
  128. return lambda job_id: job_id % num_workers == worker_id
  129. def run_cmd(cmd):
  130. """Runs command both printing its stdout output and returning it as string."""
  131. print cmd
  132. p = subprocess.Popen(
  133. cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  134. run_cmd.output = []
  135. def process(line):
  136. run_cmd.output.append(line)
  137. print line.rstrip()
  138. while p.poll() is None:
  139. process(p.stdout.readline())
  140. process(p.stdout.read())
  141. return '\n'.join(run_cmd.output)
  142. def run_ros_cmd(ros_distro, ros_cmd):
  143. """Runs command similar to run_cmd but sets ROS environment variables."""
  144. cmd = ('/bin/bash -c \"source /opt/ros/{}/setup.bash && source '
  145. '/opt/cartographer_ros/setup.bash && {}\"').format(
  146. ros_distro, ros_cmd)
  147. return run_cmd(cmd)
  148. class Job(object):
  149. """Represents a single job to be executed.
  150. A job consists of a combination of rosbag and configuration and launch files.
  151. """
  152. def __init__(self, datastore_client, job_id):
  153. self.id = job_id
  154. entity = retrieve_entity(datastore_client, 'Job', job_id)
  155. self.launch_file = entity['launch_file']
  156. self.assets_writer_launch_file = entity['assets_writer_launch_file']
  157. self.assets_writer_config_file = entity['assets_writer_config_file']
  158. self.rosbag = entity['rosbag']
  159. self.ros_package = entity['ros_package']
  160. def __repr__(self):
  161. return 'Job: id : {} launch_file: {} rosbag: {}'.format(
  162. self.id, self.launch_file, self.rosbag)
  163. def run(self, ros_distro, run_id):
  164. """Runs the job with ROS distro 'ros_distro'."""
  165. print 'running job {}'.format(self.id)
  166. # Garbage collect any left-overs from previous runs.
  167. run_cmd('rm -rf /data/*')
  168. # Copy the rosbag to scratch space
  169. scratch_dir = '/data/{}'.format(self.id)
  170. rosbag_filename = basename(self.rosbag)
  171. run_cmd('mkdir {}'.format(scratch_dir))
  172. run_cmd('gsutil cp gs://{} {}/{}'.format(self.rosbag, scratch_dir,
  173. rosbag_filename))
  174. # Creates pbstream
  175. output = run_ros_cmd(ros_distro,
  176. '/usr/bin/time -v roslaunch {} {} '
  177. 'bag_filenames:={}/{} no_rviz:=true'.format(
  178. self.ros_package, self.launch_file, scratch_dir,
  179. rosbag_filename))
  180. info = extract_stats(output)
  181. # Creates assets.
  182. run_ros_cmd(
  183. ros_distro, '/usr/bin/time -v roslaunch {} {} '
  184. 'bag_filenames:={}/{} pose_graph_filename:='
  185. '{}/{}.pbstream config_file:={}'.format(
  186. self.ros_package, self.assets_writer_launch_file, scratch_dir,
  187. rosbag_filename, scratch_dir, rosbag_filename,
  188. self.assets_writer_config_file))
  189. # Copies assets to bucket.
  190. run_cmd('gsutil cp {}/{}.pbstream '
  191. 'gs://cartographer-ci-artifacts/{}/{}/{}.pbstream'.format(
  192. scratch_dir, rosbag_filename, run_id, self.id, rosbag_filename))
  193. run_cmd('gsutil cp {}/{}_* gs://cartographer-ci-artifacts/{}/{}/'.format(
  194. scratch_dir, rosbag_filename, run_id, self.id))
  195. # Download ground truth relations file.
  196. run_cmd('gsutil cp gs://cartographer-ci-ground-truth/{}/relations.pb '
  197. '{}/relations.pb'.format(self.id, scratch_dir))
  198. # Calculate metrics.
  199. output = run_ros_cmd(ros_distro, 'cartographer_compute_relations_metrics '
  200. '-relations_filename {}/relations.pb '
  201. '-pose_graph_filename {}/{}.pbstream'.format(
  202. scratch_dir, scratch_dir, rosbag_filename))
  203. # Add ground truth stats.
  204. info.update(extract_ground_truth_stats(output))
  205. info['rosbag'] = rosbag_filename
  206. return info
  207. class Worker(object):
  208. """Represents a single worker that executes a sequence of Jobs."""
  209. def __init__(self, datastore_client, pipeline_id, run_id):
  210. entity = retrieve_entity(datastore_client, 'PipelineConfig', pipeline_id)
  211. self.pipeline_id = pipeline_id
  212. self.jobs = [Job(datastore_client, job_id) for job_id in entity['jobs']]
  213. self.scratch_dir = entity['scratch_dir']
  214. self.ros_distro = entity['ros_distro']
  215. self.run_id = run_id
  216. def __repr__(self):
  217. result = 'Worker: pipeline_id: {}\n'.format(self.pipeline_id)
  218. for job in self.jobs:
  219. result += '{}\n'.format(str(job))
  220. return result
  221. def run_jobs(self, selector):
  222. outputs = {}
  223. for idx, job in enumerate(self.jobs):
  224. if selector(idx):
  225. output = job.run(self.ros_distro, self.run_id)
  226. outputs[job.id] = output
  227. else:
  228. print 'job {}: skip'.format(job.id)
  229. return outputs
  230. def publish_stats_to_big_query(stats_dict, now, head_sha1):
  231. """Publishes metrics to BigQuery."""
  232. bigquery_client = bigquery.Client()
  233. dataset = bigquery_client.dataset('Cartographer')
  234. table = dataset.table('metrics')
  235. rows_to_insert = []
  236. for job_identifier, job_info in stats_dict.iteritems():
  237. print job_info
  238. data = ('{}-{}-{}'.format(
  239. now.year, now.month,
  240. now.day), head_sha1, job_identifier, job_info['rosbag'],
  241. job_info['user_time_secs'], job_info['system_time_secs'],
  242. job_info['wall_time_secs'], job_info['max_set_size_kbytes'],
  243. job_info['constraints_count'], job_info['constraints_score_min'],
  244. job_info['constraints_score_max'],
  245. job_info['constraints_score_mean'],
  246. job_info['ground_truth_abs_trans_err'],
  247. job_info['ground_truth_abs_trans_err_dev'],
  248. job_info['ground_truth_sqr_trans_err'],
  249. job_info['ground_truth_sqr_trans_err_dev'],
  250. job_info['ground_truth_abs_rot_err'],
  251. job_info['ground_truth_abs_rot_err_dev'],
  252. job_info['ground_truth_sqr_rot_err'],
  253. job_info['ground_truth_sqr_rot_err_dev'])
  254. rows_to_insert.append(data)
  255. errors = bigquery_client.create_rows(
  256. table, rows_to_insert, selected_fields=SCHEMA)
  257. if not errors:
  258. print 'Pushed {} row(s) into Cartographer:metrics'.format(
  259. len(rows_to_insert))
  260. else:
  261. print 'Errors:'
  262. pprint(errors)
  263. def parse_arguments():
  264. """Parses the command line arguments."""
  265. parser = argparse.ArgumentParser(
  266. description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
  267. parser.add_argument('--worker_id', type=int)
  268. parser.add_argument('--num_workers', type=int)
  269. parser.add_argument('--pipeline_id', type=str)
  270. return parser.parse_args()
  271. def main():
  272. args = parse_arguments()
  273. ds_client = datastore.Client()
  274. job_selector = create_job_selector(int(args.worker_id), int(args.num_workers))
  275. head_sha1 = get_head_git_sha1()
  276. now = datetime.datetime.now()
  277. pipeline_run_id = '{}-{}-{}_{}'.format(now.year, now.month, now.day,
  278. head_sha1)
  279. worker = Worker(ds_client, args.pipeline_id, pipeline_run_id)
  280. stats_dict = worker.run_jobs(job_selector)
  281. publish_stats_to_big_query(stats_dict, now, head_sha1)
  282. if __name__ == '__main__':
  283. main()