123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- """This is the script executed by workers of the quality control pipline."""
- import argparse
- import datetime
- from os.path import basename
- from pprint import pprint
- import re
- import subprocess
- from google.cloud import bigquery
- from google.cloud import datastore
- class Pattern(object):
- """Defines a pattern for regular expression matching."""
- def __init__(self, pattern):
- self.regex = re.compile(pattern, re.MULTILINE)
- def extract(self, text):
- """Returns a dictionary of named capture groups to extracted output.
- Args:
- text: input to parse
- Returns an empty dict if no match was found.
- """
- match = self.regex.search(text)
- if match is None:
- return {}
- return match.groupdict()
- def extract_last_occurence(self, text):
- """Returns tuple of extracted outputs.
- Args:
- text: input to parse
- Returns the information extracted from the last match. Returns
- None if no match was found.
- """
- matches = self.regex.findall(text)
- if not matches:
- return None
- return matches[-1]
- # BigQuery table schema
- SCHEMA = [
- bigquery.SchemaField('date', 'DATE'),
- bigquery.SchemaField('commit_sha1', 'STRING'),
- bigquery.SchemaField('job_id', 'INTEGER'),
- bigquery.SchemaField('rosbag', 'STRING'),
- bigquery.SchemaField('user_time_secs', 'FLOAT'),
- bigquery.SchemaField('system_time_secs', 'FLOAT'),
- bigquery.SchemaField('wall_time_secs', 'FLOAT'),
- bigquery.SchemaField('max_set_size_kbytes', 'INTEGER'),
- bigquery.SchemaField('constraints_count', 'INTEGER'),
- bigquery.SchemaField('constraints_score_minimum', 'FLOAT'),
- bigquery.SchemaField('constraints_score_maximum', 'FLOAT'),
- bigquery.SchemaField('constraints_score_mean', 'FLOAT'),
- bigquery.SchemaField('ground_truth_abs_trans_err', 'FLOAT'),
- bigquery.SchemaField('ground_truth_abs_trans_err_dev', 'FLOAT'),
- bigquery.SchemaField('ground_truth_sqr_trans_err', 'FLOAT'),
- bigquery.SchemaField('ground_truth_sqr_trans_err_dev', 'FLOAT'),
- bigquery.SchemaField('ground_truth_abs_rot_err', 'FLOAT'),
- bigquery.SchemaField('ground_truth_abs_rot_err_dev', 'FLOAT'),
- bigquery.SchemaField('ground_truth_sqr_rot_err', 'FLOAT'),
- bigquery.SchemaField('ground_truth_sqr_rot_err_dev', 'FLOAT')
- ]
- # Pattern matchers for the various fields of the '/usr/bin/time -v' output
- USER_TIME_PATTERN = Pattern(
- r'^\s*User time \(seconds\): (?P<user_time>\d+.\d+|\d+)')
- SYSTEM_TIME_PATTERN = Pattern(
- r'^\s*System time \(seconds\): (?P<system_time>\d+.\d+|\d+)')
- WALL_TIME_PATTERN = Pattern(
- r'^\s*Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): '
- r'((?P<hours>\d{1,2}):|)(?P<minutes>\d{1,2}):(?P<seconds>\d{2}\.\d{2})')
- MAX_RES_SET_SIZE_PATTERN = Pattern(
- r'^\s*Maximum resident set size \(kbytes\): (?P<max_set_size>\d+)')
- CONSTRAINT_STATS_PATTERN = Pattern(
- r'Score histogram:[\n\r]+'
- r'Count:\s+(?P<constraints_count>\d+)\s+'
- r'Min:\s+(?P<constraints_score_min>\d+\.\d+)\s+'
- r'Max:\s+(?P<constraints_score_max>\d+\.\d+)\s+'
- r'Mean:\s+(?P<constraints_score_mean>\d+\.\d+)')
- GROUND_TRUTH_STATS_PATTERN = Pattern(
- r'Result:[\n\r]+'
- r'Abs translational error (?P<abs_trans_err>\d+\.\d+) '
- r'\+/- (?P<abs_trans_err_dev>\d+\.\d+) m[\n\r]+'
- r'Sqr translational error (?P<sqr_trans_err>\d+\.\d+) '
- r'\+/- (?P<sqr_trans_err_dev>\d+\.\d+) m\^2[\n\r]+'
- r'Abs rotational error (?P<abs_rot_err>\d+\.\d+) '
- r'\+/- (?P<abs_rot_err_dev>\d+\.\d+) deg[\n\r]+'
- r'Sqr rotational error (?P<sqr_rot_err>\d+\.\d+) '
- r'\+/- (?P<sqr_rot_err_dev>\d+\.\d+) deg\^2')
- # Pattern matcher for extracting the HEAD commit SHA-1 hash.
- GIT_SHA1_PATTERN = Pattern(r'^(?P<sha1>[0-9a-f]{40})\s+HEAD')
- def get_head_git_sha1():
- """Returns the SHA-1 hash of the commit tagged HEAD."""
- output = subprocess.check_output([
- 'git', 'ls-remote',
- 'https://github.com/googlecartographer/cartographer.git'
- ])
- parsed = GIT_SHA1_PATTERN.extract(output)
- return parsed['sha1']
- def extract_stats(inp):
- """Returns a dictionary of stats."""
- result = {}
- parsed = USER_TIME_PATTERN.extract(inp)
- result['user_time_secs'] = float(parsed['user_time'])
- parsed = SYSTEM_TIME_PATTERN.extract(inp)
- result['system_time_secs'] = float(parsed['system_time'])
- parsed = WALL_TIME_PATTERN.extract(inp)
- result['wall_time_secs'] = float(parsed['hours'] or 0.) * 3600 + float(
- parsed['minutes']) * 60 + float(parsed['seconds'])
- parsed = MAX_RES_SET_SIZE_PATTERN.extract(inp)
- result['max_set_size_kbytes'] = int(parsed['max_set_size'])
- parsed = CONSTRAINT_STATS_PATTERN.extract_last_occurence(inp)
- print parsed
- result['constraints_count'] = int(parsed[0])
- result['constraints_score_min'] = float(parsed[1])
- result['constraints_score_max'] = float(parsed[2])
- result['constraints_score_mean'] = float(parsed[3])
- return result
- def extract_ground_truth_stats(input_text):
- """Returns a dictionary of ground truth stats."""
- result = {}
- parsed = GROUND_TRUTH_STATS_PATTERN.extract(input_text)
- for name in ('abs_trans_err', 'sqr_trans_err', 'abs_rot_err', 'sqr_rot_err'):
- result['ground_truth_{}'.format(name)] = float(parsed[name])
- result['ground_truth_{}_dev'.format(name)] = float(
- parsed['{}_dev'.format(name)])
- return result
- def retrieve_entity(datastore_client, kind, identifier):
- """Convenience function for Datastore entity retrieval."""
- key = datastore_client.key(kind, identifier)
- return datastore_client.get(key)
- def create_job_selector(worker_id, num_workers):
- """Constructs a round-robin job selector."""
- return lambda job_id: job_id % num_workers == worker_id
- def run_cmd(cmd):
- """Runs command both printing its stdout output and returning it as string."""
- print cmd
- p = subprocess.Popen(
- cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- run_cmd.output = []
- def process(line):
- run_cmd.output.append(line)
- print line.rstrip()
- while p.poll() is None:
- process(p.stdout.readline())
- process(p.stdout.read())
- return '\n'.join(run_cmd.output)
- def run_ros_cmd(ros_distro, ros_cmd):
- """Runs command similar to run_cmd but sets ROS environment variables."""
- cmd = ('/bin/bash -c \"source /opt/ros/{}/setup.bash && source '
- '/opt/cartographer_ros/setup.bash && {}\"').format(
- ros_distro, ros_cmd)
- return run_cmd(cmd)
- class Job(object):
- """Represents a single job to be executed.
- A job consists of a combination of rosbag and configuration and launch files.
- """
- def __init__(self, datastore_client, job_id):
- self.id = job_id
- entity = retrieve_entity(datastore_client, 'Job', job_id)
- self.launch_file = entity['launch_file']
- self.assets_writer_launch_file = entity['assets_writer_launch_file']
- self.assets_writer_config_file = entity['assets_writer_config_file']
- self.rosbag = entity['rosbag']
- self.ros_package = entity['ros_package']
- def __repr__(self):
- return 'Job: id : {} launch_file: {} rosbag: {}'.format(
- self.id, self.launch_file, self.rosbag)
- def run(self, ros_distro, run_id):
- """Runs the job with ROS distro 'ros_distro'."""
- print 'running job {}'.format(self.id)
- # Garbage collect any left-overs from previous runs.
- run_cmd('rm -rf /data/*')
- # Copy the rosbag to scratch space
- scratch_dir = '/data/{}'.format(self.id)
- rosbag_filename = basename(self.rosbag)
- run_cmd('mkdir {}'.format(scratch_dir))
- run_cmd('gsutil cp gs://{} {}/{}'.format(self.rosbag, scratch_dir,
- rosbag_filename))
- # Creates pbstream
- output = run_ros_cmd(ros_distro,
- '/usr/bin/time -v roslaunch {} {} '
- 'bag_filenames:={}/{} no_rviz:=true'.format(
- self.ros_package, self.launch_file, scratch_dir,
- rosbag_filename))
- info = extract_stats(output)
- # Creates assets.
- run_ros_cmd(
- ros_distro, '/usr/bin/time -v roslaunch {} {} '
- 'bag_filenames:={}/{} pose_graph_filename:='
- '{}/{}.pbstream config_file:={}'.format(
- self.ros_package, self.assets_writer_launch_file, scratch_dir,
- rosbag_filename, scratch_dir, rosbag_filename,
- self.assets_writer_config_file))
- # Copies assets to bucket.
- run_cmd('gsutil cp {}/{}.pbstream '
- 'gs://cartographer-ci-artifacts/{}/{}/{}.pbstream'.format(
- scratch_dir, rosbag_filename, run_id, self.id, rosbag_filename))
- run_cmd('gsutil cp {}/{}_* gs://cartographer-ci-artifacts/{}/{}/'.format(
- scratch_dir, rosbag_filename, run_id, self.id))
- # Download ground truth relations file.
- run_cmd('gsutil cp gs://cartographer-ci-ground-truth/{}/relations.pb '
- '{}/relations.pb'.format(self.id, scratch_dir))
- # Calculate metrics.
- output = run_ros_cmd(ros_distro, 'cartographer_compute_relations_metrics '
- '-relations_filename {}/relations.pb '
- '-pose_graph_filename {}/{}.pbstream'.format(
- scratch_dir, scratch_dir, rosbag_filename))
- # Add ground truth stats.
- info.update(extract_ground_truth_stats(output))
- info['rosbag'] = rosbag_filename
- return info
- class Worker(object):
- """Represents a single worker that executes a sequence of Jobs."""
- def __init__(self, datastore_client, pipeline_id, run_id):
- entity = retrieve_entity(datastore_client, 'PipelineConfig', pipeline_id)
- self.pipeline_id = pipeline_id
- self.jobs = [Job(datastore_client, job_id) for job_id in entity['jobs']]
- self.scratch_dir = entity['scratch_dir']
- self.ros_distro = entity['ros_distro']
- self.run_id = run_id
- def __repr__(self):
- result = 'Worker: pipeline_id: {}\n'.format(self.pipeline_id)
- for job in self.jobs:
- result += '{}\n'.format(str(job))
- return result
- def run_jobs(self, selector):
- outputs = {}
- for idx, job in enumerate(self.jobs):
- if selector(idx):
- output = job.run(self.ros_distro, self.run_id)
- outputs[job.id] = output
- else:
- print 'job {}: skip'.format(job.id)
- return outputs
- def publish_stats_to_big_query(stats_dict, now, head_sha1):
- """Publishes metrics to BigQuery."""
- bigquery_client = bigquery.Client()
- dataset = bigquery_client.dataset('Cartographer')
- table = dataset.table('metrics')
- rows_to_insert = []
- for job_identifier, job_info in stats_dict.iteritems():
- print job_info
- data = ('{}-{}-{}'.format(
- now.year, now.month,
- now.day), head_sha1, job_identifier, job_info['rosbag'],
- job_info['user_time_secs'], job_info['system_time_secs'],
- job_info['wall_time_secs'], job_info['max_set_size_kbytes'],
- job_info['constraints_count'], job_info['constraints_score_min'],
- job_info['constraints_score_max'],
- job_info['constraints_score_mean'],
- job_info['ground_truth_abs_trans_err'],
- job_info['ground_truth_abs_trans_err_dev'],
- job_info['ground_truth_sqr_trans_err'],
- job_info['ground_truth_sqr_trans_err_dev'],
- job_info['ground_truth_abs_rot_err'],
- job_info['ground_truth_abs_rot_err_dev'],
- job_info['ground_truth_sqr_rot_err'],
- job_info['ground_truth_sqr_rot_err_dev'])
- rows_to_insert.append(data)
- errors = bigquery_client.create_rows(
- table, rows_to_insert, selected_fields=SCHEMA)
- if not errors:
- print 'Pushed {} row(s) into Cartographer:metrics'.format(
- len(rows_to_insert))
- else:
- print 'Errors:'
- pprint(errors)
- def parse_arguments():
- """Parses the command line arguments."""
- parser = argparse.ArgumentParser(
- description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument('--worker_id', type=int)
- parser.add_argument('--num_workers', type=int)
- parser.add_argument('--pipeline_id', type=str)
- return parser.parse_args()
- def main():
- args = parse_arguments()
- ds_client = datastore.Client()
- job_selector = create_job_selector(int(args.worker_id), int(args.num_workers))
- head_sha1 = get_head_git_sha1()
- now = datetime.datetime.now()
- pipeline_run_id = '{}-{}-{}_{}'.format(now.year, now.month, now.day,
- head_sha1)
- worker = Worker(ds_client, args.pipeline_id, pipeline_run_id)
- stats_dict = worker.run_jobs(job_selector)
- publish_stats_to_big_query(stats_dict, now, head_sha1)
- if __name__ == '__main__':
- main()
|