756 lines
28 KiB
Python
Executable File
756 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# Copyright 2020 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import argparse
|
|
import logging as log
|
|
import os
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import multiprocessing
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import json
|
|
import functools
|
|
import glob
|
|
|
|
from google.cloud import storage
|
|
from google.api_core import exceptions as cloud_exceptions
|
|
# pylint: disable=no-name-in-module, import-error
|
|
|
|
import common
|
|
from autotest_lib.client.common_lib import global_config
|
|
from autotest_lib.client.common_lib import mail, pidfile
|
|
from autotest_lib.tko.parse import parse_one, export_tko_job_to_file
|
|
|
|
# Appends the moblab source paths for the pubsub wrapper
|
|
sys.path.append('/mnt/host/source/src/platform/moblab/src')
|
|
from moblab_common import pubsub_client
|
|
|
|
STATUS_FILE = "status"
|
|
STATUS_LOG_FILE = "status.log"
|
|
KEYVAL_FILE = "keyval"
|
|
NEW_KEYVAL_FILE = "new_keyval"
|
|
UPLOADED_STATUS_FILE = ".uploader_status"
|
|
STATUS_GOOD = "PUBSUB_SENT"
|
|
FAKE_MOBLAB_ID_FILE = "fake_moblab_id_do_not_delete.txt"
|
|
GIT_HASH_FILE = "git_hash.txt"
|
|
GIT_COMMAND = ("git log --pretty=format:'%h -%d %s (%ci) <%an>'"
|
|
" --abbrev-commit -20")
|
|
AUTOTEST_DIR = "/mnt/host/source/src/third_party/autotest/files/"
|
|
DEFAULT_SUITE_NAME = "default_suite"
|
|
SUITE_NAME_REGEX = "Fetching suite for suite named (.+?)\.\.\."
|
|
DEBUG_FILE_PATH = "debug/test_that.DEBUG"
|
|
CONFIG_DIR = os.path.dirname(os.path.abspath(__file__)) + "/config/"
|
|
DEFAULT_BOTO_CONFIG = CONFIG_DIR + ".boto_upload_utils"
|
|
UPLOAD_CONFIG = CONFIG_DIR + "upload_config.json"
|
|
SERVICE_ACCOUNT_CONFIG = CONFIG_DIR + ".service_account.json"
|
|
|
|
logging = log.getLogger(__name__)
|
|
|
|
|
|
def parse_arguments(argv):
|
|
"""Creates the argument parser.
|
|
|
|
Args:
|
|
argv: A list of input arguments.
|
|
|
|
Returns:
|
|
A parser object for input arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
subparsers = parser.add_subparsers(
|
|
help='select sub option for test result utility',
|
|
dest='subcommand')
|
|
subparsers.required = True
|
|
parser.add_argument("-v",
|
|
"--verbose",
|
|
dest='verbose',
|
|
action='store_true',
|
|
help="Enable verbose (debug) logging.")
|
|
parser.add_argument("-q",
|
|
"--quiet",
|
|
dest='quiet',
|
|
action='store_true',
|
|
help="Quiet mode for background call")
|
|
def_logfile = "/tmp/" + os.path.basename(
|
|
sys.argv[0].split(".")[0]) + ".log"
|
|
parser.add_argument("-l",
|
|
"--logfile",
|
|
type=str,
|
|
required=False,
|
|
default=def_logfile,
|
|
help="Full path to logfile. Default: " + def_logfile)
|
|
|
|
# configuration subcommand to create config file and populate environment
|
|
config_parser = subparsers.add_parser(name="config",
|
|
help='upload test results to CPCon')
|
|
config_parser.add_argument(
|
|
"-b",
|
|
"--bucket",
|
|
type=str,
|
|
required=True,
|
|
help="The GCS bucket that test results are uploaded to, e.g."
|
|
"'gs://xxxx'.")
|
|
config_parser.add_argument("-f",
|
|
"--force",
|
|
dest='force',
|
|
action="store_true",
|
|
help="Force overwrite of previous config files")
|
|
|
|
upload_parser = subparsers.add_parser(name="upload",
|
|
help='upload test results to CPCon')
|
|
upload_parser.add_argument(
|
|
"--bug",
|
|
type=_valid_bug_id,
|
|
required=False,
|
|
help=
|
|
"Write bug id to the test results. Each test entry can only have "
|
|
"at most 1 bug id. Optional.")
|
|
upload_parser.add_argument(
|
|
"-d",
|
|
"--directory",
|
|
type=str,
|
|
required=True,
|
|
help="The directory of non-Moblab test results.")
|
|
upload_parser.add_argument(
|
|
"--parse_only",
|
|
action='store_true',
|
|
help="Generate job.serialize locally but do not upload test "
|
|
"directories and not send pubsub messages.")
|
|
upload_parser.add_argument(
|
|
"--upload_only",
|
|
action='store_true',
|
|
help="Leave existing protobuf files as-is, only upload "
|
|
"directories and send pubsub messages.")
|
|
upload_parser.add_argument(
|
|
"-f",
|
|
"--force",
|
|
dest='force',
|
|
action='store_true',
|
|
help=
|
|
"force re-upload of results even if results were already successfully uploaded."
|
|
)
|
|
upload_parser.add_argument(
|
|
"-s",
|
|
"--suite",
|
|
type=str,
|
|
default=None,
|
|
help="The suite is used to identify the type of test results,"
|
|
"e.g. 'power' for platform power team. If not specific, the "
|
|
"default value is 'default_suite'.")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def _confirm_option(question):
|
|
"""
|
|
Get a yes/no answer from the user via command line.
|
|
|
|
Args:
|
|
question: string, question to ask the user.
|
|
|
|
Returns:
|
|
A boolean. True if yes; False if no.
|
|
"""
|
|
expected_answers = ['y', 'yes', 'n', 'no']
|
|
answer = ''
|
|
while answer not in expected_answers:
|
|
answer = input(question + "(y/n): ").lower().strip()
|
|
return answer[0] == "y"
|
|
|
|
|
|
def _read_until_string(pipe, stop_string):
|
|
lines = [""]
|
|
while True:
|
|
c = pipe.read(1)
|
|
lines[-1] = lines[-1] + c.decode("utf-8")
|
|
if stop_string == lines[-1]:
|
|
return lines
|
|
if c.decode("utf-8") == "\n":
|
|
lines.append("")
|
|
|
|
|
|
def _configure_environment(parsed_args):
|
|
# create config directory if not exists
|
|
os.makedirs(CONFIG_DIR, exist_ok=True)
|
|
|
|
if os.path.exists(UPLOAD_CONFIG) and not parsed_args.force:
|
|
logging.error("Environment already configured, run with --force")
|
|
exit(1)
|
|
|
|
# call the gsutil config tool to set up accounts
|
|
if os.path.exists(DEFAULT_BOTO_CONFIG + ".bak"):
|
|
os.remove(DEFAULT_BOTO_CONFIG + ".bak")
|
|
|
|
if os.path.exists(DEFAULT_BOTO_CONFIG):
|
|
os.remove(DEFAULT_BOTO_CONFIG)
|
|
os.mknod(DEFAULT_BOTO_CONFIG)
|
|
os.environ["BOTO_CONFIG"] = DEFAULT_BOTO_CONFIG
|
|
os.environ[
|
|
"GOOGLE_APPLICATION_CREDENTIALS"] = CONFIG_DIR + ".service_account.json"
|
|
with subprocess.Popen(["gsutil", "config"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
stdin=subprocess.PIPE) as sp:
|
|
lines = _read_until_string(sp.stdout, "Enter the authorization code: ")
|
|
code = input("enter auth code from " + str(lines[1]) + ": ")
|
|
sp.stdin.write(bytes(code + '\n', "utf-8"))
|
|
sp.stdin.flush()
|
|
lines = _read_until_string(sp.stdout, "What is your project-id? ")
|
|
sp.stdin.write(bytes(parsed_args.bucket + '\n', "utf-8"))
|
|
sp.stdin.flush()
|
|
|
|
subprocess.run([
|
|
"gsutil", "cp",
|
|
"gs://" + parsed_args.bucket + "/.service_account.json", CONFIG_DIR
|
|
])
|
|
subprocess.run([
|
|
"gsutil", "cp",
|
|
"gs://" + parsed_args.bucket + "/pubsub-key-do-not-delete.json",
|
|
CONFIG_DIR
|
|
])
|
|
|
|
sa_filename = ""
|
|
if os.path.exists(CONFIG_DIR + "/.service_account.json"):
|
|
sa_filename = ".service_account.json"
|
|
elif os.path.exists(CONFIG_DIR + "/pubsub-key-do-not-delete.json"):
|
|
sa_filename = "pubsub-key-do-not-delete.json"
|
|
else:
|
|
logging.error("No pubsub key found in bucket, failed config!")
|
|
exit(1)
|
|
|
|
# deposit parsed_args.bucket to the json file
|
|
with open(UPLOAD_CONFIG, "w") as cf:
|
|
settings = {}
|
|
settings["bucket"] = parsed_args.bucket
|
|
settings["service_account"] = CONFIG_DIR + sa_filename
|
|
settings["boto_key"] = DEFAULT_BOTO_CONFIG
|
|
|
|
cf.write(json.dumps(settings))
|
|
|
|
|
|
def _load_config():
|
|
mandatory_keys = ["bucket", "service_account", "boto_key"]
|
|
|
|
if not os.path.exists(UPLOAD_CONFIG):
|
|
logging.error("Missing mandatory config file, run config command")
|
|
exit(1)
|
|
with open(UPLOAD_CONFIG, "r") as cf:
|
|
settings = json.load(cf)
|
|
|
|
for key in mandatory_keys:
|
|
if key not in settings:
|
|
logging.error("Missing mandatory setting " + str(key) +
|
|
", run config command")
|
|
exit()
|
|
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = settings["service_account"]
|
|
os.environ["BOTO_CONFIG"] = settings["boto_key"]
|
|
return argparse.Namespace(**settings)
|
|
|
|
|
|
class ResultsManager:
|
|
def __init__(self, results_parser, results_sender):
|
|
self.parent_directories = []
|
|
self.result_directories = set()
|
|
self.results = []
|
|
self.results_parser = results_parser
|
|
self.results_sender = results_sender
|
|
self.bug_id = None
|
|
self.suite_name = ""
|
|
|
|
self.moblab_id = self.get_fake_moblab_id()
|
|
|
|
def new_directory(self, parent_dir: str):
|
|
self.parent_directories.append(parent_dir)
|
|
|
|
def enumerate_all_directories(self):
|
|
self.result_directories = set()
|
|
for parent_dir in self.parent_directories:
|
|
self.enumerate_result_directories(parent_dir)
|
|
|
|
def enumerate_result_directories(self, parent_dir):
|
|
""" Gets all test directories.
|
|
|
|
Args:
|
|
parent_dir: The parent directory of one or multiple test directories
|
|
|
|
Creates a local_result for all directories with a status.log file
|
|
and appends to local_results
|
|
"""
|
|
if not os.path.exists(parent_dir) or not os.path.isdir(parent_dir):
|
|
logging.warning('Test directory does not exist: %s' % parent_dir)
|
|
return
|
|
|
|
status_log_file = os.path.join(parent_dir, STATUS_LOG_FILE)
|
|
if os.path.exists(status_log_file):
|
|
self.result_directories.add(parent_dir)
|
|
return
|
|
|
|
for dir_name in os.listdir(parent_dir):
|
|
subdir = os.path.join(parent_dir, dir_name)
|
|
if os.path.isdir(subdir):
|
|
self.enumerate_result_directories(subdir)
|
|
|
|
def set_destination(self, destination):
|
|
self.results_sender.set_destination(destination)
|
|
|
|
def get_fake_moblab_id(self):
|
|
"""Get or generate a fake moblab id.
|
|
|
|
Moblab id is the unique id to a moblab device. Since the upload script runs
|
|
from the chroot instead of a moblab device, we need to generate a fake
|
|
moblab id to comply with the CPCon backend. If there is a previously saved
|
|
fake moblab id, read and use it. Otherwise, generate a uuid to fake a moblab
|
|
device, and store it in the same directory as the upload script.
|
|
|
|
Returns:
|
|
A string representing a fake moblab id.
|
|
"""
|
|
script_dir = os.path.dirname(__file__)
|
|
fake_moblab_id_path = os.path.join(script_dir, "config",
|
|
FAKE_MOBLAB_ID_FILE)
|
|
|
|
# Migrate from prior moblab ID location into config directory if possible
|
|
old_moblab_id_file = os.path.join(script_dir, FAKE_MOBLAB_ID_FILE)
|
|
if os.path.exists(old_moblab_id_file):
|
|
logging.info(
|
|
'Found an existing moblab ID outside config directory, migrating now'
|
|
)
|
|
os.rename(old_moblab_id_file, fake_moblab_id_path)
|
|
try:
|
|
with open(fake_moblab_id_path, "r") as fake_moblab_id_file:
|
|
fake_moblab_id = str(fake_moblab_id_file.read())[0:32]
|
|
if fake_moblab_id:
|
|
return fake_moblab_id
|
|
except IOError as e:
|
|
logging.info(
|
|
'Cannot find a fake moblab id at %s, creating a new one.',
|
|
fake_moblab_id_path)
|
|
fake_moblab_id = uuid.uuid4().hex
|
|
try:
|
|
with open(fake_moblab_id_path, "w") as fake_moblab_id_file:
|
|
fake_moblab_id_file.write(fake_moblab_id)
|
|
except IOError as e:
|
|
logging.warning('Unable to write the fake moblab id to %s: %s',
|
|
fake_moblab_id_path, e)
|
|
return fake_moblab_id
|
|
|
|
def overwrite_suite_name(self, suite_name):
|
|
self.suite_name = suite_name
|
|
|
|
def annotate_results_with_bugid(self, bug_id):
|
|
self.bug_id = bug_id
|
|
|
|
def parse_all_results(self, upload_only: bool = False):
|
|
self.results = []
|
|
self.enumerate_all_directories()
|
|
|
|
for result_dir in self.result_directories:
|
|
if self.bug_id is not None:
|
|
self.results_parser.write_bug_id(result_dir, self.bug_id)
|
|
self.results.append(
|
|
(result_dir,
|
|
self.results_parser.parse(result_dir,
|
|
upload_only,
|
|
suite_name=self.suite_name)))
|
|
|
|
def upload_all_results(self, force):
|
|
for result in self.results:
|
|
self.results_sender.upload_result_and_notify(
|
|
result[0], self.moblab_id, result[1], force)
|
|
|
|
|
|
class FakeTkoDb:
|
|
def find_job(self, tag):
|
|
return None
|
|
|
|
def run_with_retry(self, fn, *args):
|
|
fn(*args)
|
|
|
|
|
|
class ResultsParserClass:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def job_tag(self, job_id, machine):
|
|
return str(job_id) + "-moblab/" + str(machine)
|
|
|
|
def parse(self, path, upload_only: bool, suite_name=""):
|
|
#temporarily assign a fake job id until parsed
|
|
fake_job_id = 1234
|
|
fake_machine = "localhost"
|
|
name = self.job_tag(fake_job_id, fake_machine)
|
|
parse_options = argparse.Namespace(
|
|
**{
|
|
"suite_report": False,
|
|
"dry_run": True,
|
|
"reparse": False,
|
|
"mail_on_failure": False
|
|
})
|
|
pid_file_manager = pidfile.PidFileManager("parser", path)
|
|
self.print_autotest_git_history(path)
|
|
job = parse_one(FakeTkoDb(), pid_file_manager, name, path,
|
|
parse_options)
|
|
job.board = job.tests[0].attributes['host-board']
|
|
job_id = int(job.started_time.timestamp() * 1000)
|
|
job.afe_parent_job_id = job_id + 1
|
|
if suite_name == "":
|
|
job.suite = self.parse_suite_name(path)
|
|
else:
|
|
job.suite = suite_name
|
|
job.build_version = self.get_build_version(job.tests)
|
|
name = self.job_tag(job_id, job.machine)
|
|
if not upload_only:
|
|
export_tko_job_to_file(job, name, path + "/job.serialize")
|
|
|
|
# autotest_lib appends additional global logger handlers
|
|
# remove these handlers to avoid affecting logging for the google
|
|
# storage library
|
|
for handler in log.getLogger().handlers:
|
|
log.getLogger().removeHandler(handler)
|
|
return job
|
|
|
|
def print_autotest_git_history(self, path):
|
|
"""
|
|
Print the hash of the latest git commit of the autotest directory.
|
|
|
|
Args:
|
|
path: The test directory for non-moblab test results.
|
|
"""
|
|
git_hash = subprocess.check_output(shlex.split(GIT_COMMAND),
|
|
cwd=AUTOTEST_DIR)
|
|
git_hash_path = os.path.join(path, GIT_HASH_FILE)
|
|
with open(git_hash_path, "w") as git_hash_file:
|
|
git_hash_file.write(git_hash.decode("utf-8"))
|
|
|
|
def parse_suite_name(self, path):
|
|
"""Get the suite name from a results directory.
|
|
|
|
If we don't find the suite name in the first ten lines of test_that.DEBUG
|
|
then return None.
|
|
|
|
Args:
|
|
path: The directory specified on the command line.
|
|
"""
|
|
path = path.split('/')[:-1]
|
|
path = '/'.join(path)
|
|
|
|
debug_file = os.path.join(path, DEBUG_FILE_PATH)
|
|
if not os.path.exists(debug_file) or not os.path.isfile(debug_file):
|
|
return None
|
|
exp = re.compile(SUITE_NAME_REGEX)
|
|
try:
|
|
with open(debug_file) as f:
|
|
line_count = 0
|
|
for line in f:
|
|
line_count += 1
|
|
if line_count > 10:
|
|
break
|
|
result = exp.search(line)
|
|
if not result:
|
|
continue
|
|
else:
|
|
return result.group(1)
|
|
except IOError as e:
|
|
logging.warning('Error trying to read test_that.DEBUG: %s', e)
|
|
return DEFAULT_SUITE_NAME
|
|
|
|
def get_build_version(self, tests):
|
|
release_version_label = "CHROMEOS_RELEASE_VERSION"
|
|
milestone_label = "CHROMEOS_RELEASE_CHROME_MILESTONE"
|
|
for test in tests:
|
|
if not test.subdir:
|
|
continue
|
|
|
|
release = None
|
|
milestone = None
|
|
if release_version_label in test.attributes:
|
|
release = test.attributes[release_version_label]
|
|
if milestone_label in test.attributes:
|
|
milestone = test.attributes[milestone_label]
|
|
if release and milestone:
|
|
return "R%s-%s" % (milestone, release)
|
|
|
|
return ""
|
|
|
|
def valid_bug_id(self, v):
|
|
"""Check if user input bug id is in valid format.
|
|
|
|
Args:
|
|
v: User input bug id in string.
|
|
Returns:
|
|
An int representing the bug id.
|
|
Raises:
|
|
argparse.ArgumentTypeError: if user input bug id has wrong format.
|
|
"""
|
|
try:
|
|
bug_id = int(v)
|
|
except ValueError as e:
|
|
raise argparse.ArgumentTypeError(
|
|
"Bug id %s is not a positive integer: "
|
|
"%s" % (v, e))
|
|
if bug_id <= 0:
|
|
raise argparse.ArgumentTypeError(
|
|
"Bug id %s is not a positive integer" % v)
|
|
return bug_id
|
|
|
|
def write_bug_id(self, test_dir, bug_id):
|
|
"""
|
|
Write the bug id to the test results.
|
|
|
|
Args:
|
|
test_dir: The test directory for non-moblab test results.
|
|
bug_id: The bug id to write to the test results.
|
|
Returns:
|
|
A boolean. True if the bug id is written successfully or is the same as
|
|
the old bug id already in test results; False if failed to write the
|
|
bug id, or if the user decides not to overwrite the old bug id already
|
|
in test results.
|
|
"""
|
|
old_bug_id = None
|
|
new_keyval = list()
|
|
|
|
keyval_file = os.path.join(test_dir, KEYVAL_FILE)
|
|
try:
|
|
with open(keyval_file, 'r') as keyval_raw:
|
|
for line in keyval_raw.readlines():
|
|
match = re.match(r'bug_id=(\d+)', line)
|
|
if match:
|
|
old_bug_id = self.valid_bug_id(match.group(1))
|
|
else:
|
|
new_keyval.append(line)
|
|
except IOError as e:
|
|
logging.error(
|
|
'Cannot read keyval file from %s, skip writing the bug '
|
|
'id %s: %s', test_dir, bug_id, e)
|
|
return False
|
|
|
|
if old_bug_id:
|
|
if old_bug_id == bug_id:
|
|
return True
|
|
overwrite_bug_id = _confirm_option(
|
|
'Would you like to overwrite bug id '
|
|
'%s with new bug id %s?' % (old_bug_id, bug_id))
|
|
if not overwrite_bug_id:
|
|
return False
|
|
|
|
new_keyval.append('bug_id=%s' % bug_id)
|
|
new_keyval_file = os.path.join(test_dir, NEW_KEYVAL_FILE)
|
|
try:
|
|
with open(new_keyval_file, 'w') as new_keyval_raw:
|
|
for line in new_keyval:
|
|
new_keyval_raw.write(line)
|
|
new_keyval_raw.write('\n')
|
|
shutil.move(new_keyval_file, keyval_file)
|
|
return True
|
|
except Exception as e:
|
|
logging.error(
|
|
'Cannot write bug id to keyval file in %s, skip writing '
|
|
'the bug id %s: %s', test_dir, bug_id, e)
|
|
return False
|
|
|
|
|
|
ResultsParser = ResultsParserClass()
|
|
_valid_bug_id = functools.partial(ResultsParserClass.valid_bug_id,
|
|
ResultsParser)
|
|
|
|
|
|
class ResultsSenderClass:
|
|
def __init__(self):
|
|
self.gcs_bucket = ""
|
|
|
|
def set_destination(self, destination):
|
|
self.gcs_bucket = destination
|
|
|
|
def upload_result_and_notify(self, test_dir, moblab_id, job, force):
|
|
job_id = str(int(job.started_time.timestamp() * 1000))
|
|
if self.uploaded(test_dir) and not force:
|
|
return
|
|
self.upload_result(test_dir, moblab_id, job_id, job.machine)
|
|
self.send_pubsub_message(test_dir, moblab_id, job_id)
|
|
|
|
def upload_batch_files(self, gs_path, test_dir, files):
|
|
for file in files:
|
|
if not os.path.isfile(file):
|
|
continue
|
|
gs_client_bucket = storage.Client().bucket(self.gcs_bucket)
|
|
# remove trailing slash to ensure dest_file path gets created properly
|
|
test_dir = test_dir.rstrip('/')
|
|
dest_file = gs_path + file.replace(test_dir, "", 1)
|
|
logging.info("uploading file: %s", dest_file)
|
|
blob = gs_client_bucket.blob(dest_file)
|
|
blob.upload_from_filename(file)
|
|
|
|
def upload_result(self, test_dir, moblab_id, job_id, hostname):
|
|
"""
|
|
Upload the test directory with job.serialize to GCS bucket.
|
|
|
|
Args:
|
|
args: A list of input arguments.
|
|
test_dir: The test directory for non-moblab test results.
|
|
job_keyval: The key-value object of the job.
|
|
moblab_id: A string that represents the unique id of a moblab device.
|
|
job_id: A job id.
|
|
"""
|
|
upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE)
|
|
with open(upload_status_file, "w") as upload_status:
|
|
upload_status.write("UPLOADING")
|
|
|
|
fake_moblab_id = moblab_id
|
|
fake_moblab_install_id = moblab_id
|
|
|
|
gcs_bucket_path = os.path.join("results", fake_moblab_id,
|
|
fake_moblab_install_id,
|
|
"%s-moblab" % job_id, hostname)
|
|
|
|
try:
|
|
logging.info(
|
|
"Start to upload test directory: %s to GCS bucket path: %s",
|
|
test_dir, gcs_bucket_path)
|
|
with open(upload_status_file, "w") as upload_status:
|
|
upload_status.write("UPLOADED")
|
|
|
|
files_to_upload = glob.glob(test_dir + "/**", recursive=True)
|
|
batch_size = 8
|
|
with multiprocessing.Pool(4) as p:
|
|
files_to_upload_batch = [
|
|
files_to_upload[i:i + batch_size]
|
|
for i in range(0, len(files_to_upload), batch_size)
|
|
]
|
|
p.map(
|
|
functools.partial(
|
|
ResultsSenderClass.upload_batch_files, self,
|
|
gcs_bucket_path, test_dir),
|
|
files_to_upload_batch)
|
|
|
|
logging.info(
|
|
"Successfully uploaded test directory: %s to GCS bucket path: %s",
|
|
test_dir, gcs_bucket_path)
|
|
except Exception as e:
|
|
with open(upload_status_file, "w") as upload_status:
|
|
upload_status.write("UPLOAD_FAILED")
|
|
raise Exception(
|
|
"Failed to upload test directory: %s to GCS bucket "
|
|
"path: %s for the error: %s" %
|
|
(test_dir, gcs_bucket_path, e))
|
|
|
|
def send_pubsub_message(self, test_dir, moblab_id, job_id):
|
|
"""
|
|
Send pubsub messages to trigger CPCon pipeline to process non-moblab
|
|
test results in the specific GCS bucket path.
|
|
|
|
Args:
|
|
bucket: The GCS bucket.
|
|
moblab_id: A moblab id.
|
|
job_id: A job id.
|
|
"""
|
|
moblab_install_id = moblab_id
|
|
console_client = pubsub_client.PubSubBasedClient()
|
|
gsuri = "gs://%s/results/%s/%s/%s-moblab" % (
|
|
self.gcs_bucket, moblab_id, moblab_install_id, job_id)
|
|
|
|
try:
|
|
logging.info("Start to send the pubsub message to GCS path: %s",
|
|
gsuri)
|
|
message_id = \
|
|
console_client.send_test_job_offloaded_message(gsuri,
|
|
moblab_id,
|
|
moblab_install_id)
|
|
upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE)
|
|
with open(upload_status_file, "w") as upload_status:
|
|
upload_status.write(STATUS_GOOD)
|
|
|
|
logging.info(
|
|
"Successfully sent the pubsub message with message id: %s to GCS "
|
|
"path: %s", message_id[0], gsuri)
|
|
except Exception as e:
|
|
raise Exception(
|
|
"Failed to send the pubsub message with moblab id: %s "
|
|
"and job id: %s to GCS path: %s for the error: %s" %
|
|
(moblab_id, job_id, gsuri, e))
|
|
|
|
def uploaded(self, test_dir):
|
|
"""
|
|
Checks if the message for the uploaded bucket has been sent.
|
|
|
|
Args:
|
|
test_dir: The test directory for non-moblab test results.
|
|
"""
|
|
upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE)
|
|
if not os.path.exists(upload_status_file):
|
|
logging.debug("The upload status file %s does not exist.",
|
|
upload_status_file)
|
|
return False
|
|
|
|
with open(upload_status_file, "r") as upload_status:
|
|
if upload_status.read() == STATUS_GOOD:
|
|
logging.warn(
|
|
"The test directory: %s status has already been "
|
|
"sent to CPCon and the .upload_status file has "
|
|
"been set to PUBSUB_SENT.", test_dir)
|
|
return True
|
|
else:
|
|
logging.debug("The pubsub message was not successful")
|
|
return False
|
|
|
|
|
|
ResultsSender = ResultsSenderClass()
|
|
|
|
|
|
def main(args):
|
|
parsed_args = parse_arguments(args)
|
|
|
|
fmt = log.Formatter('%(asctime)s :: %(levelname)-8s :: %(message)s')
|
|
logging.propagate = False
|
|
|
|
log_level = log.INFO
|
|
if parsed_args.verbose:
|
|
log_level = log.DEBUG
|
|
if not parsed_args.quiet:
|
|
stream_handler = log.StreamHandler(sys.stdout)
|
|
stream_handler.setFormatter(fmt)
|
|
stream_handler.setLevel(log_level)
|
|
logging.addHandler(stream_handler)
|
|
|
|
logging.info("logging to %s", parsed_args.logfile)
|
|
file_handler = log.FileHandler(parsed_args.logfile, mode='w')
|
|
file_handler.setFormatter(fmt)
|
|
file_handler.setLevel(log.DEBUG)
|
|
logging.addHandler(file_handler)
|
|
|
|
if parsed_args.subcommand == "config":
|
|
_configure_environment(parsed_args)
|
|
return
|
|
|
|
persistent_settings = _load_config()
|
|
|
|
results_manager = ResultsManager(ResultsParser, ResultsSender)
|
|
results_manager.set_destination(persistent_settings.bucket)
|
|
results_manager.new_directory(parsed_args.directory)
|
|
|
|
if parsed_args.bug:
|
|
results_manager.annotate_results_with_bugid(parsed_args.bug)
|
|
if parsed_args.suite:
|
|
results_manager.overwrite_suite_name(parsed_args.suite)
|
|
if parsed_args.parse_only:
|
|
results_manager.parse_all_results()
|
|
elif parsed_args.upload_only:
|
|
results_manager.parse_all_results(upload_only=True)
|
|
results_manager.upload_all_results(force=parsed_args.force)
|
|
else:
|
|
results_manager.parse_all_results()
|
|
results_manager.upload_all_results(force=parsed_args.force)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main(sys.argv[1:])
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|