360 lines
12 KiB
Python
Executable File
360 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# Copyright 2021 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import argparse
|
|
import ast
|
|
from functools import partial
|
|
import os
|
|
import subprocess
|
|
import graphviz
|
|
import common
|
|
|
|
from server.cros.dynamic_suite.control_file_getter import FileSystemGetter
|
|
from server.cros.dynamic_suite.suite_common import retrieve_for_suite
|
|
|
|
class TestSuite(object):
|
|
def __init__(self, cf_object, name, file_path):
|
|
self.name = name
|
|
self.cf_object = cf_object
|
|
self.tests = []
|
|
self.file_path = file_path
|
|
|
|
def add_test(self, test_object):
|
|
self.tests.append(test_object)
|
|
|
|
def get_tests(self):
|
|
return self.tests
|
|
|
|
|
|
class TestObject(object):
|
|
def __init__(self, cf_object, file_path):
|
|
self.name = cf_object.name
|
|
self.type = 'tast' if ('tast' in self.name) else 'tauto'
|
|
self.cf_object = cf_object
|
|
self.file_path = file_path
|
|
self.tast_exprs = ''
|
|
self.tast_string = ''
|
|
|
|
def get_attributes(self):
|
|
return self.cf_object.attributes
|
|
|
|
def is_tast(self):
|
|
return self.type == 'tast'
|
|
|
|
# use the python syntax tree library to parse the run function
|
|
# and grab the test_expr from the 'tast' run command
|
|
def parse_cf_for_tast_string(self):
|
|
with open(self.file_path, 'r') as cf:
|
|
mod = ast.parse(cf.read())
|
|
for n in mod.body:
|
|
if n.__class__ != ast.FunctionDef:
|
|
continue
|
|
if n.name != 'run':
|
|
continue
|
|
for sub_node in n.body:
|
|
if sub_node.__class__ != ast.Expr:
|
|
continue
|
|
try:
|
|
fn_name = sub_node.value.func.value.id
|
|
if fn_name != 'job':
|
|
continue
|
|
except:
|
|
continue
|
|
if sub_node.value.func.attr != 'run_test':
|
|
continue
|
|
for keyword in sub_node.value.keywords:
|
|
if keyword.arg == 'test_exprs' and keyword.value.__class__ == ast.List:
|
|
test_exprs = []
|
|
regex_list = False
|
|
for elem in keyword.value.elts:
|
|
try:
|
|
test_exprs.append(elem.s)
|
|
regex_list = ('(' in elem.s or regex_list)
|
|
except AttributeError:
|
|
print('WARNING: Non-standard test found, check '
|
|
+ self.file_path + ' manually')
|
|
break
|
|
if regex_list:
|
|
self.tast_string = ' '.join(test_exprs)
|
|
else:
|
|
for it in range(len(test_exprs) - 1):
|
|
test_exprs[it] = test_exprs[it] + ','
|
|
self.tast_string = ' '.join(test_exprs)
|
|
|
|
def enumerate_tast_from_test_expr(self):
|
|
self.parse_cf_for_tast_string()
|
|
try:
|
|
self.tast_exprs = self.tast_string.split(', ')
|
|
except AttributeError:
|
|
print('WARNING: Non-standard test found, check' + self.file_path +
|
|
' manually')
|
|
|
|
def enumerate_tests_from_tast_exprs(self, dut):
|
|
tests = []
|
|
print(self.tast_exprs)
|
|
for expr in self.tast_exprs:
|
|
en = subprocess.check_output(
|
|
['tast', 'list', str(dut),
|
|
str(expr)], encoding='utf-8')
|
|
for t in en.split('\n'):
|
|
if t == '':
|
|
continue
|
|
tests.append(t)
|
|
en = subprocess.check_output([
|
|
'tast', 'list', '-buildbundle=crosint',
|
|
str(dut),
|
|
str(expr)
|
|
],
|
|
encoding='utf-8')
|
|
for t in en.split('\n'):
|
|
if t == '':
|
|
continue
|
|
tests.append(t)
|
|
|
|
return tests
|
|
|
|
def describe(self):
|
|
return 'test named ' + self.name + ' of type ' + self.type
|
|
|
|
|
|
class TestParser(object):
|
|
def get_all_test_objects(self, locations):
|
|
tests = {}
|
|
suites = {}
|
|
|
|
cf_getter = FileSystemGetter(locations)
|
|
for (file_path, cf_object) in retrieve_for_suite(cf_getter,
|
|
'').items():
|
|
if cf_object.test_class == 'suite':
|
|
suites[cf_object.name] = (TestSuite(cf_object, cf_object.name,
|
|
file_path))
|
|
else:
|
|
tests[cf_object.name] = (TestObject(cf_object, file_path))
|
|
if tests[cf_object.name].is_tast():
|
|
tests[cf_object.name].enumerate_tast_from_test_expr()
|
|
|
|
return tests, suites
|
|
|
|
|
|
class TestManager(object):
|
|
def __init__(self):
|
|
self.tests = {}
|
|
self.suites = {}
|
|
self.dut = None
|
|
self.log_functions = [partial(print)]
|
|
self.test_parser = TestParser()
|
|
|
|
def log(self, log_text, *args):
|
|
for fn in self.log_functions:
|
|
fn(log_text, *args)
|
|
|
|
def csv_logger(self, log_text, file_path):
|
|
with open(file_path, 'a') as log:
|
|
log.write(log_text)
|
|
|
|
def register_csv_logger(self, file_path):
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
print_to_csv = partial(self.csv_logger, file_path=file_path)
|
|
self.log_functions.append(print_to_csv)
|
|
print_to_csv('suite,test\n')
|
|
|
|
def initialize_from_fs(self, locations):
|
|
self.tests, self.suites = self.test_parser.get_all_test_objects(
|
|
locations)
|
|
|
|
def process_all_tests(self):
|
|
for test, test_object in self.tests.items():
|
|
for suite in test_object.get_attributes():
|
|
target_suite = self.find_suite_named(suite)
|
|
if target_suite is not None:
|
|
target_suite.add_test(test)
|
|
|
|
def set_dut(self, target):
|
|
self.dut = target
|
|
|
|
def get_dut(self):
|
|
if self.dut is not None:
|
|
return self.dut
|
|
else:
|
|
raise AttributeError(
|
|
'DUT Address not set, please use the --dut flag to indicate the ip address of the DUT'
|
|
)
|
|
|
|
def find_test_named(self, test_name):
|
|
try:
|
|
queried_test = self.tests[test_name]
|
|
return queried_test
|
|
except KeyError:
|
|
return None
|
|
|
|
def find_suite_named(self, suite_name):
|
|
try:
|
|
if suite_name[0:6] == 'suite.':
|
|
queried_suite = self.suites[suite_name[6:]]
|
|
elif suite_name[0:6] == 'suite:':
|
|
queried_suite = self.suites[suite_name[6:]]
|
|
else:
|
|
queried_suite = self.suites[suite_name]
|
|
return queried_suite
|
|
except KeyError:
|
|
return None
|
|
|
|
def list_suite_named(self, suite_name, pretty=False):
|
|
suite_tests = []
|
|
suite = self.find_suite_named(suite_name)
|
|
|
|
if suite is None:
|
|
if pretty:
|
|
return '\n'
|
|
return suite_tests
|
|
|
|
for test in suite.get_tests():
|
|
if self.tests[test].is_tast():
|
|
found_tests = self.tests[test].enumerate_tests_from_tast_exprs(
|
|
self.get_dut())
|
|
for t in found_tests:
|
|
if t == '':
|
|
continue
|
|
suite_tests.append('tast.' + str(t))
|
|
else:
|
|
suite_tests.append(test)
|
|
|
|
if pretty:
|
|
out_as_string = ''
|
|
for test in suite_tests:
|
|
out_as_string += suite_name + ',' + str(test) + '\n'
|
|
return out_as_string
|
|
return suite_tests
|
|
|
|
def gs_query_link(self, suite_name):
|
|
test_names = ','.join([
|
|
test for test in self.list_suite_named(suite_name)
|
|
if test != ''
|
|
])
|
|
|
|
query = 'https://dashboards.corp.google.com/'
|
|
query += '_86acf8a8_50a5_48e0_829e_fbf1033d3ac6'
|
|
query += '?f=test_name:in:' + test_names
|
|
query += '&f=create_date_7_day_filter:in:Past%207%20Days'
|
|
query += '&f=test_type:in:Tast,Autotest'
|
|
|
|
return query
|
|
|
|
def graph_suite_named(self, suite_name, dot_graph=None):
|
|
suite_tests = self.list_suite_named(suite_name)
|
|
nodes_at_rank = 0
|
|
|
|
if dot_graph is None:
|
|
dot_graph = graphviz.Digraph(comment=suite_name)
|
|
|
|
dot_graph.node(suite_name, suite_name)
|
|
last_level = suite_name
|
|
child_graph = None
|
|
|
|
for test_name in suite_tests:
|
|
if nodes_at_rank == 0:
|
|
child_graph = graphviz.Digraph()
|
|
dot_graph.edge(last_level, test_name)
|
|
last_level = test_name
|
|
|
|
child_graph.node(test_name, test_name)
|
|
dot_graph.edge(suite_name, test_name)
|
|
|
|
if nodes_at_rank == 6:
|
|
dot_graph.subgraph(child_graph)
|
|
|
|
nodes_at_rank += 1
|
|
nodes_at_rank %= 7
|
|
|
|
dot_graph.subgraph(child_graph)
|
|
|
|
return dot_graph
|
|
|
|
def diff_test_suites(self, suite_a, suite_b):
|
|
res = ''
|
|
suite_a_set = set(self.list_suite_named(suite_a))
|
|
suite_b_set = set(self.list_suite_named(suite_b))
|
|
res = res + ('Suite B (+)' + str(list(suite_b_set - suite_a_set)))
|
|
res = res + '\n'
|
|
res = res + ('Suite B (-)' + str(list(suite_a_set - suite_b_set)))
|
|
return res
|
|
|
|
|
|
def main(args):
|
|
tests = TestManager()
|
|
|
|
basepath = os.path.dirname(os.path.abspath(__file__))
|
|
tests.initialize_from_fs([(basepath + '/../test_suites'),
|
|
(basepath + '/../server/site_tests'),
|
|
(basepath + '/../client/site_tests')])
|
|
tests.process_all_tests()
|
|
|
|
if args.csv is not None:
|
|
tests.register_csv_logger(args.csv)
|
|
if args.dut is not None:
|
|
tests.set_dut(args.dut)
|
|
if args.find_test is not None:
|
|
test = tests.find_test_named(args.find_test)
|
|
if test is not None:
|
|
tests.log(test.file_path)
|
|
else:
|
|
tests.log('Queried test not found')
|
|
if args.find_suite is not None:
|
|
suite = tests.find_suite_named(args.find_suite)
|
|
if suite is not None:
|
|
tests.log(suite.file_path)
|
|
else:
|
|
tests.log('Queried suite not found')
|
|
if args.list_suite is not None:
|
|
tests.log(tests.list_suite_named(args.list_suite, pretty=True))
|
|
if args.list_multiple_suites is not None:
|
|
for suite_name in args.list_multiple_suites:
|
|
tests.log(tests.list_suite_named(suite_name, pretty=True))
|
|
if args.diff is not None:
|
|
tests.log(tests.diff_test_suites(args.diff[0], args.diff[1]))
|
|
if args.graph_suite is not None:
|
|
graph = tests.graph_suite_named(args.graph_suite)
|
|
graph.render('./suite_data/suite_viz.gv', format='png')
|
|
if args.gs_dashboard is not None:
|
|
link = tests.gs_query_link(args.gs_dashboard)
|
|
tests.log(link)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# pass in the url for the DUT via ssh
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--csv',
|
|
help='supply csv file path for logging output')
|
|
parser.add_argument(
|
|
'--diff',
|
|
nargs=2,
|
|
help=
|
|
'show diff between two suites. Ex: --diff bvt-tast-cq pvs-tast-cq')
|
|
parser.add_argument('--find_test',
|
|
help='find control file for test_name')
|
|
parser.add_argument('--find_suite',
|
|
help='find control file for suite_name')
|
|
parser.add_argument(
|
|
'--graph_suite',
|
|
help=
|
|
'graph test dependencies of suite_name, will output to contrib/suite_data'
|
|
)
|
|
parser.add_argument('--list_suite',
|
|
help='list units in suite_name')
|
|
parser.add_argument(
|
|
'--list_multiple_suites',
|
|
nargs='*',
|
|
help='list units in suite_name_1 suite_name_2 suite_name_n')
|
|
parser.add_argument('--dut',
|
|
help='ip address and port for tast enumeration')
|
|
parser.add_argument(
|
|
'--gs_dashboard',
|
|
help='generate green stainless dashboard for suite_name')
|
|
parsed_args = parser.parse_args()
|
|
|
|
main(parsed_args)
|