1553 lines
58 KiB
Python
1553 lines
58 KiB
Python
# Lint as: python2, python3
|
|
# Copyright 2016 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import functools
|
|
import logging
|
|
import math
|
|
import sys
|
|
import time
|
|
|
|
import common
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.client.common_lib import hosts
|
|
from autotest_lib.client.common_lib import utils
|
|
from autotest_lib.server.cros.servo import servo
|
|
from autotest_lib.server.hosts import cros_constants
|
|
from autotest_lib.server.hosts import repair_utils
|
|
from autotest_lib.server.hosts import servo_constants
|
|
from autotest_lib.server.cros.servo.topology import servo_topology
|
|
from autotest_lib.site_utils.admin_audit import servo_updater
|
|
import six
|
|
|
|
try:
|
|
from autotest_lib.utils.frozen_chromite.lib import metrics
|
|
except ImportError:
|
|
metrics = utils.metrics_mock
|
|
|
|
from autotest_lib.utils.frozen_chromite.lib import timeout_util
|
|
|
|
def ignore_exception_for_non_cros_host(func):
|
|
"""
|
|
Decorator to ignore ControlUnavailableError if servo host is not cros host.
|
|
When using test_that command on a workstation, this enables usage of
|
|
additional servo devices such as servo micro and Sweetberry. This shall not
|
|
change any lab behavior.
|
|
"""
|
|
@functools.wraps(func)
|
|
def wrapper(self, host):
|
|
"""
|
|
Wrapper around func.
|
|
"""
|
|
try:
|
|
func(self, host)
|
|
except servo.ControlUnavailableError as e:
|
|
if host.is_cros_host():
|
|
raise
|
|
logging.warning("Servo host is not cros host, ignore %s: %s",
|
|
type(e).__name__, e)
|
|
return wrapper
|
|
|
|
|
|
class _UpdateVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to trigger a servo host update, if necessary.
|
|
|
|
The verifier works only for servo_v3.
|
|
The operation doesn't wait for the update to complete and is
|
|
considered a success whether or not the servo is currently
|
|
up-to-date.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.LONG_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
try:
|
|
if (
|
|
not host.get_dut_host_info()
|
|
or not host.get_dut_host_info().servo_cros_stable_version):
|
|
logging.info('Servo stable version missed.'
|
|
' Skip update check action.')
|
|
return
|
|
# We have seen cases that invalid GPT headers/entries block
|
|
# v3s from been update, so always try to repair here.
|
|
# See crbug.com/994396, crbug.com/1057302.
|
|
host.run('cgpt repair /dev/mmcblk0', ignore_status=True)
|
|
host.update_image()
|
|
# We don't want failure from update block DUT repair action.
|
|
# See crbug.com/1029950.
|
|
except Exception as e:
|
|
six.reraise(hosts.AutoservNonCriticalVerifyError,
|
|
hosts.AutoservNonCriticalVerifyError(e),
|
|
sys.exc_info()[2])
|
|
|
|
def _is_applicable(self, host):
|
|
# Run only for servo_v3 host.
|
|
if host.is_labstation() or host.is_containerized_servod():
|
|
return False
|
|
# Only run if the host is in the physical lab.
|
|
return host.is_in_lab()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Servo_v3 host software is up-to-date'
|
|
|
|
|
|
class _StartServodVerifier(hosts.Verifier):
|
|
"""First start of servod on the host.
|
|
|
|
Single running action to start servod in the first time.
|
|
This verifier created to fit current flow and will be revisited later.
|
|
Action never fails!
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if not hasattr(self, 'started'):
|
|
logging.info('Starting servod!')
|
|
host.restart_servod(quick_startup=True)
|
|
# caching the value to prevent restart service when trigger verifier.
|
|
self.started = True
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Initial servod start'
|
|
|
|
|
|
class _RootServoPresentVerifier(hosts.Verifier):
|
|
"""Verifier that first servo is present."""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
device = None
|
|
topology = host.get_topology()
|
|
topology.read(host.get_dut_host_info())
|
|
try:
|
|
device = topology.get_root_servo()
|
|
except Exception as e:
|
|
if host.is_containerized_servod():
|
|
host.restart_servod()
|
|
logging.debug('Restarting servod container (Not critical) %s',
|
|
e)
|
|
else:
|
|
host.request_reboot()
|
|
logging.info(
|
|
'Reboot labstation requested, it will be handled'
|
|
' by labstation AdminRepair task.'
|
|
' Unable to detect root servo info from topology.')
|
|
logging.debug('(Not critical) %s', e)
|
|
if device:
|
|
logging.info('Root servo is present')
|
|
return
|
|
device = topology.get_root_servo_from_cache()
|
|
if device:
|
|
logging.debug('Found device: %s', device)
|
|
if device.get_serial_number() != host.servo_serial:
|
|
self.serial_mismatch = True
|
|
raise hosts.AutoservVerifyError('Serial mismatch detected')
|
|
logging.info('Root servo is present')
|
|
return
|
|
# Leaving error in case we got empty device.
|
|
raise hosts.AutoservVerifyError('Root servo not found!')
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_containerized_servod():
|
|
logging.info('Servod is running within a container.')
|
|
return True
|
|
if not host.is_labstation():
|
|
logging.info('Not supported for servo_v3.')
|
|
return False
|
|
# Only run if the host is in the physical lab.
|
|
return host.is_in_lab()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Root servo is present'
|
|
|
|
|
|
class _RootServoV3PresentVerifier(hosts.Verifier):
|
|
"""Verifier that first servo is present."""
|
|
|
|
RETRY_COUNT = 3
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
for a in range(self.RETRY_COUNT):
|
|
logging.debug('Attempt: %s find servo board on servo_v3.', a + 1)
|
|
present = host.is_servo_board_present_on_servo_v3()
|
|
if present == False:
|
|
raise hosts.AutoservVerifyError('Servo board not found!')
|
|
elif present == True:
|
|
logging.debug('Servo board is present')
|
|
return
|
|
raise hosts.AutoservVerifyError('Fail to find servo board!')
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_containerized_servod():
|
|
logging.info('Servod is running within a container.')
|
|
return False
|
|
# Do not run for servos under labstations.
|
|
if host.is_labstation():
|
|
logging.info('Servod is running on labstation.')
|
|
return False
|
|
# Only run if the host is in the physical lab.
|
|
return host.is_in_lab()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Servo board on servo_v3 is present'
|
|
|
|
|
|
class _ServoFwVerifier(hosts.Verifier):
|
|
"""Verifier to check is a servo fw is up-to-date."""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
try:
|
|
if servo_updater.any_servo_needs_firmware_update(host):
|
|
raise hosts.AutoservVerifyError(
|
|
'Some servo requires firmware update')
|
|
except servo_updater.ServoFwVersionMissedError as e:
|
|
# Do not fail as it will trigger re-flash fw on the servo
|
|
logging.info(
|
|
'Issue with detect new version of firmware for servo.'
|
|
' Please file a bug agains Fleet Automation team (go/fleet-bug)'
|
|
)
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_containerized_servod():
|
|
logging.info('Servod is running within a container.')
|
|
return True
|
|
# Run only for servos under labstations.
|
|
if not host.is_labstation():
|
|
logging.info('Not supported for servo_v3.')
|
|
return False
|
|
# Only run if the host is in the physical lab.
|
|
return host.is_in_lab()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Servo fw is up-to-date'
|
|
|
|
|
|
class _ConfigVerifier(hosts.Verifier):
|
|
"""
|
|
Base verifier for the servo config file verifiers.
|
|
"""
|
|
|
|
CONFIG_FILE = '/var/lib/servod/config'
|
|
ATTR = ''
|
|
|
|
@staticmethod
|
|
def _get_config_val(host, config_file, attr):
|
|
"""
|
|
Get the `attr` for `host` from `config_file`.
|
|
|
|
@param host Host to be checked for `config_file`.
|
|
@param config_file Path to the config file to be tested.
|
|
@param attr Attribute to get from config file.
|
|
|
|
@return The attr val as set in the config file, or `None` if
|
|
the file was absent.
|
|
"""
|
|
getboard = ('CONFIG=%s ; [ -f $CONFIG ] && '
|
|
'. $CONFIG && echo $%s' % (config_file, attr))
|
|
attr_val = host.run(getboard, ignore_status=True).stdout
|
|
return attr_val.strip('\n') if attr_val else None
|
|
|
|
@staticmethod
|
|
def _validate_attr(host, val, expected_val, attr, config_file):
|
|
"""
|
|
Check that the attr setting is valid for the host.
|
|
|
|
This presupposes that a valid config file was found. Raise an
|
|
execption if:
|
|
* There was no attr setting from the file (i.e. the setting
|
|
is an empty string), or
|
|
* The attr setting is valid, the attr is known,
|
|
and the setting doesn't match the DUT.
|
|
|
|
@param host Host to be checked for `config_file`.
|
|
@param val Value to be tested.
|
|
@param expected_val Expected value.
|
|
@param attr Attribute we're validating.
|
|
@param config_file Path to the config file to be tested.
|
|
"""
|
|
if not val:
|
|
raise hosts.AutoservVerifyError(
|
|
'config file %s exists, but %s '
|
|
'is not set' % (attr, config_file))
|
|
if expected_val is not None and val != expected_val:
|
|
raise hosts.AutoservVerifyError(
|
|
'%s is %s; it should be %s' % (attr, val, expected_val))
|
|
|
|
|
|
def _get_config(self, host):
|
|
"""
|
|
Return the config file to check.
|
|
|
|
@param host Host object.
|
|
|
|
@return The config file to check.
|
|
"""
|
|
return '%s_%d' % (self.CONFIG_FILE, host.servo_port)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'servo %s setting is correct' % self.ATTR
|
|
|
|
|
|
class _SerialConfigVerifier(_ConfigVerifier):
|
|
"""
|
|
Verifier for the servo SERIAL configuration.
|
|
"""
|
|
|
|
ATTR = 'SERIAL'
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
"""
|
|
Test whether the `host` has a `SERIAL` setting configured.
|
|
|
|
This tests the config file names used by the `servod` upstart
|
|
job for a valid setting of the `SERIAL` variable. The following
|
|
conditions raise errors:
|
|
* The SERIAL setting doesn't match the DUT's entry in the AFE
|
|
database.
|
|
* There is no config file.
|
|
"""
|
|
if not host.is_cros_host():
|
|
return
|
|
# Not all servo hosts will have a servo serial so don't verify if it's
|
|
# not set.
|
|
if host.servo_serial is None:
|
|
return
|
|
config = self._get_config(host)
|
|
serialval = self._get_config_val(host, config, self.ATTR)
|
|
if serialval is None:
|
|
raise hosts.AutoservVerifyError(
|
|
'Servo serial is unconfigured; should be %s'
|
|
% host.servo_serial
|
|
)
|
|
|
|
self._validate_attr(host, serialval, host.servo_serial, self.ATTR,
|
|
config)
|
|
|
|
|
|
|
|
class _BoardConfigVerifier(_ConfigVerifier):
|
|
"""
|
|
Verifier for the servo BOARD configuration.
|
|
"""
|
|
|
|
ATTR = 'BOARD'
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
"""
|
|
Test whether the `host` has a `BOARD` setting configured.
|
|
|
|
This tests the config file names used by the `servod` upstart
|
|
job for a valid setting of the `BOARD` variable. The following
|
|
conditions raise errors:
|
|
* A config file exists, but the content contains no setting
|
|
for BOARD.
|
|
* The BOARD setting doesn't match the DUT's entry in the AFE
|
|
database.
|
|
* There is no config file.
|
|
"""
|
|
if not host.is_cros_host():
|
|
return
|
|
config = self._get_config(host)
|
|
boardval = self._get_config_val(host, config, self.ATTR)
|
|
if boardval is None:
|
|
msg = 'Servo board is unconfigured'
|
|
if host.servo_board is not None:
|
|
msg += '; should be %s' % host.servo_board
|
|
raise hosts.AutoservVerifyError(msg)
|
|
|
|
self._validate_attr(host, boardval, host.servo_board, self.ATTR,
|
|
config)
|
|
|
|
|
|
class _ServodJobVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check that the `servod` upstart job is running.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if not host.is_cros_host():
|
|
return
|
|
status_cmd = 'status servod PORT=%d' % host.servo_port
|
|
job_status = host.run(status_cmd, ignore_status=True).stdout
|
|
if 'start/running' not in job_status:
|
|
raise hosts.AutoservVerifyError(
|
|
'servod not running on %s port %d' %
|
|
(host.hostname, host.servo_port))
|
|
|
|
@property
|
|
def description(self):
|
|
return 'servod upstart job is running'
|
|
|
|
|
|
class _ServodEchoVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check that the `servod` upstart job is responsible.
|
|
"""
|
|
|
|
SERVOD_INITIALIZED = 'servodtool instance wait-for-active -p %d --timeout 60'
|
|
SERVOD_RESPONSIVE = 'dut-control -p %d serialname'
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
self._verify_servod_initialized(host)
|
|
self._verify_servod_responsive(host)
|
|
|
|
def _verify_servod_initialized(self, host):
|
|
# Verify that servod initialized.
|
|
cmd = self.SERVOD_INITIALIZED % host.servo_port
|
|
res = host.run(cmd, ignore_status=True, timeout=120)
|
|
if res.exit_status != 0:
|
|
raise hosts.AutoservVerifyError(
|
|
'Servod instance is not initialized')
|
|
logging.debug("Presented instance: %s", res.stdout.strip())
|
|
|
|
def _verify_servod_responsive(self, host):
|
|
# Verify if servod started and process is responsible.
|
|
cmd = self.SERVOD_RESPONSIVE % host.servo_port
|
|
res = host.run(cmd, ignore_status=True, timeout=120)
|
|
if res.exit_status != 0:
|
|
raise hosts.AutoservVerifyError(
|
|
'Servod is not responsive for dut-control commands')
|
|
logging.info('Servod responsive: %s', res.stdout)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Servod is running and responsive to dut-control run.'
|
|
|
|
|
|
class _DiskSpaceVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to make sure there is enough disk space left on servohost.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
# Check available space of stateful is greater than threshold, in Gib.
|
|
host.check_diskspace('/mnt/stateful_partition', 0.5)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'servohost has enough disk space.'
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_containerized_servod():
|
|
logging.info('Servod is running within a container.')
|
|
return False
|
|
return True
|
|
|
|
|
|
class _ServodConnectionVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check that we can connect to servod server.
|
|
|
|
If this verifier failed, it most likely servod was crashed or in a
|
|
crashing loop. For servo_v4 it's usually caused by not able to detect
|
|
CCD or servo_micro.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
host.initialize_servo()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'servod service is taking calls'
|
|
|
|
|
|
class _ServodControlVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check basic servo control functionality.
|
|
|
|
This tests the connection to the target servod service with a simple
|
|
method call. As a side-effect, all servo signals are initialized to
|
|
default values.
|
|
|
|
N.B. Initializing servo signals is necessary because the power
|
|
button and lid switch verifiers both test against expected initial
|
|
values.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
try:
|
|
host.initialize_dut_for_servo()
|
|
except Exception as e:
|
|
six.reraise(hosts.AutoservNonCriticalVerifyError,
|
|
hosts.AutoservNonCriticalVerifyError(e),
|
|
sys.exc_info()[2])
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Basic servod control is working'
|
|
|
|
|
|
class _Cr50ConsoleVerifier(hosts.Verifier):
|
|
"""Verifier to check if cr50 console is present and working.
|
|
|
|
Validating based by running commands and expect they will not fail.
|
|
If any command fail then console is not working as expected.
|
|
"""
|
|
|
|
COMMAND_TO_CHECK_CONSOLE = (
|
|
'gsc_ccd_level',
|
|
'cr50_testlab',
|
|
'cr50_ccd_state_flags',
|
|
)
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
try:
|
|
for command in self.COMMAND_TO_CHECK_CONSOLE:
|
|
if host.get_servo().has_control(command):
|
|
# Response of command is not important.
|
|
host.get_servo().get(command)
|
|
except Exception as e:
|
|
six.reraise(hosts.AutoservNonCriticalVerifyError,
|
|
hosts.AutoservNonCriticalVerifyError(e),
|
|
sys.exc_info()[2])
|
|
|
|
def _is_applicable(self, host):
|
|
# Only when DUT is running through ccd or c2d2.
|
|
# TODO(coconutruben): replace with ccd API when available in servo.py
|
|
return host.get_servo() and host.get_servo().main_device_uses_gsc_drv()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'CR50 console is working'
|
|
|
|
|
|
class _CCDTestlabVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check that ccd testlab is enabled.
|
|
|
|
All DUT connected by ccd has to supported cr50 with enabled testlab
|
|
to allow manipulation by servo. The flag testlab is sticky and will
|
|
stay enabled if was set up. The testlab can be enabled when ccd is
|
|
open. (go/ccd-setup)
|
|
"""
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if not host.get_servo().has_control('cr50_testlab'):
|
|
raise hosts.AutoservVerifyError(
|
|
'gsc has to be supported when use servo with '
|
|
'ccd_*/type-c connection')
|
|
|
|
status = host.get_servo().get('cr50_testlab')
|
|
# check by 'on' to fail when get unexpected value
|
|
if status == 'on':
|
|
# If servo uses cr50 to control the dut, open ccd so repair actions
|
|
# that reset the dut will work (cr50_reboot, cold_reset, warm_reset)
|
|
if host.get_servo().main_device_uses_gsc_drv():
|
|
host.get_servo().set_nocheck('cr50_testlab', 'open')
|
|
# ccd testlab enabled
|
|
return
|
|
raise hosts.AutoservNonCriticalVerifyError(
|
|
'The ccd testlab is disabled; DUT requires manual work '
|
|
'to enable it (go/ccd-setup).')
|
|
|
|
def _is_applicable(self, host):
|
|
# Only when DUT is running through ccd.
|
|
# TODO(coconutruben): replace with ccd API when available in servo.py
|
|
return host.get_servo() and host.get_servo().main_device_is_ccd()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'ccd testlab enabled'
|
|
|
|
class _CCDPowerDeliveryVerifier(hosts.Verifier):
|
|
"""Verifier to check and reset servo_v4_role for servos that support
|
|
power delivery feature(a.k.a power pass through).
|
|
|
|
There are currently two position of servo_v4_role, src and snk:
|
|
src -- servo in power delivery mode and passes power to the DUT.
|
|
snk -- servo in normal mode and not passes power to DUT.
|
|
We want to ensure that servo_v4_role is set to src.
|
|
"""
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if host.get_servo():
|
|
self._printControl(host.get_servo(), 'ppdut5_mv')
|
|
self._printControl(host.get_servo(), 'ppchg5_mv')
|
|
if host.get_servo().get('servo_pd_role') == 'snk':
|
|
raise hosts.AutoservNonCriticalVerifyError(
|
|
'Power delivery not in src role.')
|
|
|
|
def _printControl(self, servo, control):
|
|
if servo.has_control(control):
|
|
logging.info("%s: %s", control, servo.get(control))
|
|
|
|
def _is_applicable(self, host):
|
|
return (host.is_in_lab() and
|
|
host.get_servo().supports_built_in_pd_control())
|
|
|
|
@property
|
|
def description(self):
|
|
return 'ensure applicable servo is in "src" mode for power delivery'
|
|
|
|
|
|
class _BaseDUTConnectionVerifier(hosts.Verifier):
|
|
"""Verifier to check connection between DUT and servo."""
|
|
|
|
# Bus voltage on ppdut5. Value can be:
|
|
# - less than 500 - DUT is likely not connected
|
|
# - between 500 and 4000 - unexpected value
|
|
# - more than 4000 - DUT is likely connected
|
|
MAX_PPDUT5_MV_WHEN_NOT_CONNECTED = 500
|
|
MIN_PPDUT5_MV_WHEN_CONNECTED = 4000
|
|
|
|
def _is_usb_hub_connected(self, host):
|
|
"""Checking bus voltage on ppdut5.
|
|
|
|
Supported only on servo_v4 boards.
|
|
If voltage value is lower than 500 then device is not connected.
|
|
When value higher 4000 means the device is connected. If value
|
|
between 500 and 4000 is not expected and will be marked as connected
|
|
and collected information which DUT has this exception.
|
|
|
|
@returns: bool
|
|
"""
|
|
logging.debug('Started check by ppdut5_mv:on')
|
|
try:
|
|
val = host.get_servo().get('ppdut5_mv')
|
|
logging.info('ppdut5_mv=%s', val)
|
|
if val < self.MAX_PPDUT5_MV_WHEN_NOT_CONNECTED:
|
|
# servo is not connected to the DUT
|
|
return False
|
|
if val < self.MIN_PPDUT5_MV_WHEN_CONNECTED:
|
|
# is unexpected value.
|
|
# collecting metrics to look case by case
|
|
# TODO(otabek) for analysis b:163845694
|
|
data = host._get_host_metrics_data()
|
|
metrics.Counter('chromeos/autotest/repair/ppdut5_mv_case'
|
|
).increment(fields=data)
|
|
# else:
|
|
# servo is physical connected to the DUT
|
|
except Exception as e:
|
|
logging.debug('(Not critical) %s', e)
|
|
return True
|
|
|
|
def _is_ribbon_cable_connected(self, host):
|
|
"""Check if ribbon cable is connected to the DUT.
|
|
|
|
The servo_micro/flex - can be checked by `cold_reset` signal.
|
|
When `cold_reset` is `on` it commonly indicates that the DUT
|
|
is disconnected. To avoid mistake of real signal we try
|
|
switch it off and if is cannot then servo is not connected.
|
|
|
|
@returns: bool
|
|
"""
|
|
logging.debug('Started check by cold_reset:on')
|
|
try:
|
|
val = host.get_servo().get('cold_reset')
|
|
logging.info('cold_reset=%s', val)
|
|
if val == 'on':
|
|
# If cold_reset has is on can be right signal
|
|
# or caused by missing connection between servo_micro and DUT.
|
|
# if we can switch it to the off then it was signal.
|
|
host.get_servo().set('cold_reset', 'off')
|
|
except error.TestFail:
|
|
logging.debug('Ribbon cable is not connected to the DUT.')
|
|
return False
|
|
except Exception as e:
|
|
logging.debug('(Not critical) %s', e)
|
|
return True
|
|
|
|
def _is_dut_power_on(self, host):
|
|
# DUT is running in normal state.
|
|
# if EC not supported by board then we expect error
|
|
try:
|
|
return host.get_servo().get('ec_system_powerstate') == 'S0'
|
|
except Exception as e:
|
|
logging.debug('(Not critical) %s', e)
|
|
return False
|
|
|
|
def _is_servo_v4_type_a(self, host):
|
|
return host.is_labstation() and host.get_servo().is_servo_v4_type_a()
|
|
|
|
def _is_servo_v4_type_c(self, host):
|
|
return host.is_labstation() and host.get_servo().is_servo_v4_type_c()
|
|
|
|
def _is_servo_v3(self, host):
|
|
return not host.is_labstation()
|
|
|
|
|
|
class _DUTConnectionVerifier(_BaseDUTConnectionVerifier):
|
|
"""Verifier to check connection Servo to the DUT.
|
|
|
|
Servo_v4 type-a connected to the DUT by:
|
|
1) servo_micro - checked by `cold_reset`.
|
|
Servo_v4 type-c connected to the DUT by:
|
|
1) ccd - checked by ppdut5_mv.
|
|
Servo_v3 connected to the DUT by:
|
|
1) legacy servo header - can be checked by `cold_reset`.
|
|
"""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if self._is_servo_v4_type_a(host):
|
|
if not self._is_ribbon_cable_connected(host):
|
|
raise hosts.AutoservVerifyError(
|
|
'Servo_micro is likely not connected to the DUT.')
|
|
elif self._is_servo_v4_type_c(host):
|
|
if (host.get_servo().supports_built_in_pd_control()
|
|
and not self._is_usb_hub_connected(host)):
|
|
raise hosts.AutoservVerifyError(
|
|
'Servo_v4 is likely not connected to the DUT.')
|
|
elif self._is_servo_v3(host):
|
|
if not self._is_ribbon_cable_connected(host):
|
|
raise hosts.AutoservVerifyError(
|
|
'Servo_v3 is likely not connected to the DUT.')
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Ensure the Servo connected to the DUT.'
|
|
|
|
|
|
class _ServoHubConnectionVerifier(_BaseDUTConnectionVerifier):
|
|
"""Verifier to check connection ServoHub to DUT.
|
|
|
|
Servo_v4 type-a connected to the DUT by:
|
|
1) USB hub - checked by ppdut5_mv.
|
|
"""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if self._is_servo_v4_type_a(host):
|
|
if (self._is_dut_power_on(host)
|
|
and not self._is_usb_hub_connected(host)):
|
|
raise hosts.AutoservVerifyError(
|
|
'Servo USB hub is likely not connected to the DUT.')
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_ec_supported():
|
|
return True
|
|
logging.info('Host does not support EC.')
|
|
return False
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Ensure the Servo HUB connected to the DUT.'
|
|
|
|
|
|
class _BaseCr50SBUVerifier(_BaseDUTConnectionVerifier):
|
|
"""Check servod issue related to SBU voltage."""
|
|
|
|
# Min SBU voltage to detect usb-device
|
|
SBU_THRESHOLD = 2500.0
|
|
# How many times collect SBU voltage to calc AVG value.
|
|
_TOTAL_CHECK_SBU_VOLTAGE = 10
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_localhost():
|
|
logging.info('Target servo is not in a lab,'
|
|
' action is not applicable.')
|
|
return False
|
|
if not self._is_servo_v4_type_c(host):
|
|
logging.info('Check support only servo-v4 (type-c),'
|
|
' action is not applicable.')
|
|
return False
|
|
return True
|
|
|
|
def _is_sbu_voltage_issue(self, host):
|
|
"""Check if servo does not detected by SBU voltage issue."""
|
|
command = 'dut_sbu_voltage_float_fault'
|
|
if host.get_servo().has_control(command):
|
|
if host.get_servo().get(command) == 'on':
|
|
return True
|
|
return False
|
|
|
|
def _get_max_sbu_value(self, host):
|
|
"""Get average voltage on SBU lines."""
|
|
servo = host.get_servo()
|
|
if not servo.has_control('servo_dut_sbu1_mv'):
|
|
return -1
|
|
s1 = 0
|
|
s2 = 0
|
|
for i in range(self._TOTAL_CHECK_SBU_VOLTAGE):
|
|
try:
|
|
sbu1 = int(servo.get('servo_dut_sbu1_mv'))
|
|
sbu2 = int(servo.get('servo_dut_sbu2_mv'))
|
|
logging.debug('Attempt:%2d, sbu1 %4d sbu2 %4d', i, sbu1, sbu2)
|
|
s1 += sbu1
|
|
s2 += sbu2
|
|
except error.TestFail as e:
|
|
# This is a nice to have but if reading this fails, it
|
|
# shouldn't interfere with the test.
|
|
logging.exception(e)
|
|
logging.debug('Total: sbu1 %4d sbu2 %4d', s1, s2)
|
|
# Use float to get values with changes
|
|
s1 = s1 / float(self._TOTAL_CHECK_SBU_VOLTAGE)
|
|
s2 = s2 / float(self._TOTAL_CHECK_SBU_VOLTAGE)
|
|
logging.debug('Avg: sbu1 %7.2f sbu2 %7.2f', s1, s2)
|
|
max_sbu = max(s1, s2)
|
|
logging.info('Max sbu: %7.2f', max_sbu)
|
|
return max_sbu
|
|
|
|
|
|
class _Cr50OffVerifier(_BaseCr50SBUVerifier):
|
|
"""Check if CR50 is in deep sleep and fail to detected.
|
|
|
|
If SBU voltage is higher threshold but still cannot be detected
|
|
as usb device then probably CR50 is in deep sleep.
|
|
Threshold is 2500 mV on any SBU lines.
|
|
"""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if self._is_sbu_voltage_issue(host):
|
|
if self._get_max_sbu_value(host) > self.SBU_THRESHOLD:
|
|
raise hosts.AutoservVerifyError(
|
|
'CR50 voltage detected but usb device not enumerated')
|
|
|
|
@property
|
|
def description(self):
|
|
return 'CR50 voltage detected but not enumerated.'
|
|
|
|
|
|
class _Cr50LowSBUVerifier(_BaseCr50SBUVerifier):
|
|
"""Check if servod fail to detect CR50 due low voltage.
|
|
|
|
CR50 cannot be enumerated as SBU voltage line lower then
|
|
threshold.
|
|
Threshold is 2500 mV on any SBU lines.
|
|
"""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if self._is_sbu_voltage_issue(host):
|
|
v = self._get_max_sbu_value(host)
|
|
if v > 1 and v <= self.SBU_THRESHOLD:
|
|
raise hosts.AutoservVerifyError(
|
|
'Cr50 is not detected due to SBU voltages'
|
|
' being below %dmV' % self.SBU_THRESHOLD)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Cr50 not detected as both SBU voltages are below threshold.'
|
|
|
|
|
|
class _TopologyVerifier(hosts.Verifier):
|
|
"""Verifier that all servo component is presented."""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
topology = host.get_topology()
|
|
topology.read(host.get_dut_host_info())
|
|
try:
|
|
# Linux takes 1 second to detect and enumerate USB device since
|
|
# 2010 year. We take 10 seconds to be sure as old standard was
|
|
# 5 seconds.
|
|
time.sleep(10)
|
|
topology.validate(raise_error=True,
|
|
dual_set=host.is_dual_setup(),
|
|
compare=True)
|
|
except servo_topology.ServoTopologyError as e:
|
|
six.reraise(hosts.AutoservVerifyError,
|
|
hosts.AutoservVerifyError(e),
|
|
sys.exc_info()[2])
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_localhost():
|
|
logging.info('Target servo is not in a lab,'
|
|
' action is not applicable.')
|
|
return False
|
|
if not host.is_servo_topology_supported():
|
|
logging.info('Target servo-topology is not supported,'
|
|
' action is not applicable.')
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Ensure all Servo component present.'
|
|
|
|
|
|
class _PowerButtonVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check the `pwr_button` signal.
|
|
|
|
Tests that the `pwr_button` signal shows the power button has been
|
|
released. When `pwr_button` is stuck at `press`, it commonly
|
|
indicates that the ribbon cable is disconnected.
|
|
"""
|
|
# TODO (crbug.com/646593) - Remove list below once servo has been updated
|
|
# with a fake pwr_button signal.
|
|
_BOARDS_WO_PWR_BUTTON = ['arkham', 'gale', 'mistral', 'storm', 'whirlwind']
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if host.servo_board in self._BOARDS_WO_PWR_BUTTON:
|
|
return
|
|
try:
|
|
button = host.get_servo().get('pwr_button')
|
|
except Exception as e:
|
|
six.reraise(hosts.AutoservNonCriticalVerifyError,
|
|
hosts.AutoservNonCriticalVerifyError(e),
|
|
sys.exc_info()[2])
|
|
|
|
if button != 'release':
|
|
raise hosts.AutoservNonCriticalVerifyError(
|
|
'Check ribbon cable: \'pwr_button\' is stuck')
|
|
|
|
def _is_applicable(self, host):
|
|
return (host.get_servo() and host.get_servo().main_device_is_flex())
|
|
|
|
@property
|
|
def description(self):
|
|
return 'pwr_button control is normal'
|
|
|
|
|
|
class _BatteryVerifier(hosts.Verifier):
|
|
"""Collect battery info for analysis."""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
try:
|
|
servo = host.get_servo()
|
|
charging = False
|
|
if servo.has_control('battery_is_charging'):
|
|
charging = servo.get('battery_is_charging')
|
|
level = -1
|
|
if servo.has_control('battery_charge_percent'):
|
|
level = servo.get('battery_charge_percent')
|
|
design_mah = servo.get('battery_full_design_mah')
|
|
charge_mah = servo.get('battery_full_charge_mah')
|
|
logging.info('Charging: %s', charging)
|
|
logging.info('Percentage: %s', level)
|
|
logging.info('Full charge max: %s', charge_mah)
|
|
logging.info('Full design max: %s', design_mah)
|
|
# based on analysis of ratio we can find out what is
|
|
# the level when we can say that battery is dead
|
|
ratio = int(math.floor(charge_mah / design_mah * 100.0))
|
|
logging.info('Ratio: %s', ratio)
|
|
data = {
|
|
'board': host.servo_board or 'unknown',
|
|
'model': host.servo_model or 'unknown',
|
|
'ratio': ratio
|
|
}
|
|
metrics.Counter('chromeos/autotest/battery/ratio').increment(
|
|
fields=data)
|
|
except Exception as e:
|
|
# Keeping it with info level because we do not expect it.
|
|
logging.info('(Not critical) %s', e)
|
|
|
|
def _is_applicable(self, host):
|
|
if not host.is_ec_supported():
|
|
logging.info('The board not support EC')
|
|
return False
|
|
dut_info = host.get_dut_host_info()
|
|
if dut_info:
|
|
host_info = host.get_dut_host_info()
|
|
if host_info.get_label_value('power') != 'battery':
|
|
logging.info('The board does not have battery')
|
|
return False
|
|
servo = host.get_servo()
|
|
if (not servo.has_control('battery_full_design_mah')
|
|
or not servo.has_control('battery_full_charge_mah')):
|
|
logging.info('The board is not supported battery controls...')
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Logs battery levels'
|
|
|
|
|
|
class _LidVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier to check the `lid_open` signal.
|
|
"""
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
try:
|
|
lid_open = host.get_servo().get('lid_open')
|
|
except Exception as e:
|
|
six.reraise(hosts.AutoservNonCriticalVerifyError,
|
|
hosts.AutoservNonCriticalVerifyError(e),
|
|
sys.exc_info()[2])
|
|
|
|
if lid_open != 'yes' and lid_open != 'not_applicable':
|
|
raise hosts.AutoservNonCriticalVerifyError(
|
|
'Check lid switch: lid_open is %s' % lid_open)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'lid_open control is normal'
|
|
|
|
|
|
class ECConsoleVerifier(hosts.Verifier):
|
|
"""
|
|
Verifier response from the EC console.
|
|
"""
|
|
|
|
COMMAND_TO_CHECK_CONSOLE = (
|
|
'ec_system_powerstate',
|
|
'ec_board',
|
|
)
|
|
|
|
@ignore_exception_for_non_cros_host
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
if not host.is_ec_supported():
|
|
logging.info('The board does not support EC')
|
|
return
|
|
|
|
for command in self.COMMAND_TO_CHECK_CONSOLE:
|
|
if host.get_servo().has_control(command):
|
|
try:
|
|
# Response of command is not important.
|
|
r = host.get_servo().get(command)
|
|
logging.debug('Result %s:%s', command, r)
|
|
# Exiting as we confirmed that console is working.
|
|
return
|
|
except Exception as e:
|
|
logging.error('Fail to read %s control. Error: %s',
|
|
command, e)
|
|
# If we reached this point then no command succeeded.
|
|
raise hosts.AutoservNonCriticalVerifyError(
|
|
'EC console is not responding; '
|
|
'may be caused of broken EC firmware')
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Check EC console'
|
|
|
|
|
|
class ServodDutControllerMissingVerifier(hosts.Verifier):
|
|
"""Verifier to check whether the servod dut controller is missing or not.
|
|
|
|
When servod is initializing, it checks if DUT controller is
|
|
missing. If yes,then it sets 'dut_controller_missing_fault' to
|
|
'on', otherwise, to 'off'. Missing controller means servo
|
|
component connected to the DUT is missing, or is not responsive.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC)
|
|
def verify(self, host):
|
|
logging.debug('ServodDutControllerMissingVerifier: Starting verifier.')
|
|
if host.get_servo().get('dut_controller_missing_fault') == 'on':
|
|
logging.debug('ServodDutControllerMissingVerifier: DUT Controller missing fault is on.')
|
|
raise hosts.AutoservVerifyError('Servod is missing dut controller')
|
|
else:
|
|
logging.debug('ServodDutControllerMissingVerifier: DUT Controller missing fault is not on.')
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_containerized_servod():
|
|
logging.debug('ServodDutControllerMissingVerifier: Detected containerized servod.')
|
|
logging.info('Servod is running within a container')
|
|
return True
|
|
if not host.is_labstation():
|
|
logging.debug('ServodDutControllerMissingVerifier: Detected non-labstation.')
|
|
logging.info('Not supported for servo_v3.')
|
|
return False
|
|
return host.is_in_lab()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'ensure servod does not have missing dut controller'
|
|
|
|
|
|
class _ConnectionVerifier(repair_utils.SshVerifier):
|
|
"""
|
|
Ensure the servo host container is up.
|
|
"""
|
|
|
|
def verify(self, host):
|
|
if host.is_containerized_servod():
|
|
# We need start servod container first before check it-is present
|
|
host.start_containerized_servod()
|
|
return super(_ConnectionVerifier, self).verify(host)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Check the connection to the machine or container running servod.'
|
|
|
|
|
|
class _RestartServod(hosts.RepairAction):
|
|
"""Restart `servod` with the proper BOARD setting."""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
if host.is_containerized_servod():
|
|
logging.debug('Restarting servod container')
|
|
elif not host.is_cros_host():
|
|
raise hosts.AutoservRepairError(
|
|
'Can\'t restart servod: not running '
|
|
'embedded ChromeOS.',
|
|
'servo_not_applicable_to_non_cros_host')
|
|
host.restart_servod()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Start servod with the proper config settings.'
|
|
|
|
|
|
class _ServoRebootRepair(repair_utils.RebootRepair):
|
|
"""Try repair servo by reboot servohost.
|
|
|
|
This is the same as the standard `RebootRepair`, for servo_v3 it will
|
|
reboot the beaglebone board immediately while for labstation it will
|
|
request a reboot by touch a flag file on its labstation, then
|
|
labstation reboot will be handled by labstation AdminRepair task as
|
|
labstation host multiple servos and need do an synchronized reboot.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
super(_ServoRebootRepair, self).repair(host)
|
|
# restart servod for v3 after reboot.
|
|
host.restart_servod()
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_localhost() or not host.is_cros_host():
|
|
logging.info('Target servo is not in a lab, the reboot repair'
|
|
' action is not applicable.')
|
|
return False
|
|
|
|
if host.is_labstation():
|
|
host.request_reboot()
|
|
logging.info('Reboot labstation requested, it will be handled'
|
|
' by labstation AdminRepair task.')
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Reboot the servo host.'
|
|
|
|
|
|
class _ToggleCCLineRepair(hosts.RepairAction):
|
|
"""Try repair servod by toggle cc.
|
|
|
|
When cr50 is not enumerated we can try to recover it by toggle cc line.
|
|
"""
|
|
# Timeout for shut down configuration channel.
|
|
CC_OFF_TIMEOUT = 10
|
|
# Timeout for initialize configuration channel.
|
|
CC_ON_TIMEOUT = 30
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
logging.info('Turn off configuration channel and wait 10 seconds.')
|
|
servo_uart_cmd = 'servo_v4_uart_cmd'
|
|
if not host.get_servo().has_control(servo_uart_cmd):
|
|
servo_uart_cmd = 'servo_v4p1_uart_cmd'
|
|
host.get_servo().set_nocheck(servo_uart_cmd, 'cc off')
|
|
# wait till command will be effected
|
|
time.sleep(self.CC_OFF_TIMEOUT)
|
|
|
|
logging.info('Turn on configuration channel and wait 30 seconds.')
|
|
# alternative option to turn line on is by `cc srcdts`
|
|
host.get_servo().set_nocheck('servo_pd_role', 'src')
|
|
host.get_servo().set_nocheck('servo_dts_mode', 'on')
|
|
# wait till command will be effected
|
|
time.sleep(self.CC_ON_TIMEOUT)
|
|
host.restart_servod()
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_localhost():
|
|
logging.debug('Not supported for localhost.')
|
|
return False
|
|
if not host.servo_serial:
|
|
logging.debug('Servod does not have serial.')
|
|
return False
|
|
if not host.servo_recovery:
|
|
logging.debug('Servod is not running in recovery mode.')
|
|
return False
|
|
if not (host.is_labstation() or host.is_containerized_servod()):
|
|
logging.debug('Not supported for servo_v3.')
|
|
return False
|
|
if not host.get_servo():
|
|
logging.debug('Servo is not initialized.')
|
|
return False
|
|
return self._is_type_c(host)
|
|
|
|
def _is_type_c(self, host):
|
|
if host.get_dut_host_info():
|
|
servo_type = host.get_dut_host_info().get_label_value(
|
|
servo_constants.SERVO_TYPE_LABEL_PREFIX)
|
|
return 'ccd' in servo_type
|
|
return False
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Toggle cc lines'
|
|
|
|
|
|
class _FakedisconnectRepair(hosts.RepairAction):
|
|
"""Try repair servod by mimic reconnection of servo.
|
|
|
|
When cr50 is not enumerated as we can try to recover it by reconnect to DUT.
|
|
"""
|
|
# Delay to disconnect.
|
|
DISC_DELAY_MS = 100
|
|
# Timeout to wait to restore the connection.
|
|
DISC_TIMEOUT_MS = 2000
|
|
# Timeout to wait to execute the command and apply effect.
|
|
EXEC_TIMEOUT = (DISC_DELAY_MS + DISC_TIMEOUT_MS) / 1000 + 2
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
disc_cmd = ('fakedisconnect %d %d' %
|
|
(self.DISC_DELAY_MS, self.DISC_TIMEOUT_MS))
|
|
# cannot use 'set' as control is not returned executed commands
|
|
servo_uart_cmd = 'servo_v4_uart_cmd'
|
|
if not host.get_servo().has_control(servo_uart_cmd):
|
|
servo_uart_cmd = 'servo_v4p1_uart_cmd'
|
|
host.get_servo().set_nocheck(servo_uart_cmd, disc_cmd)
|
|
logging.debug('Waiting %ss for affect of action', self.EXEC_TIMEOUT)
|
|
time.sleep(self.EXEC_TIMEOUT)
|
|
host.restart_servod()
|
|
|
|
def _is_applicable(self, host):
|
|
if host.is_localhost():
|
|
logging.debug('Not supported for localhost.')
|
|
return False
|
|
if not host.servo_serial:
|
|
logging.debug('Servod does not have serial.')
|
|
return False
|
|
if not host.servo_recovery:
|
|
logging.debug('Servod is not running in recovery mode.')
|
|
return False
|
|
if not (host.is_labstation() or host.is_containerized_servod()):
|
|
logging.debug('Not supported for servo_v3.')
|
|
return False
|
|
if not host.get_servo():
|
|
logging.debug('Servo is not initialized.')
|
|
return False
|
|
return self._is_type_c(host)
|
|
|
|
def _is_type_c(self, host):
|
|
if host.get_dut_host_info():
|
|
servo_type = host.get_dut_host_info().get_label_value(
|
|
servo_constants.SERVO_TYPE_LABEL_PREFIX)
|
|
return 'ccd' in servo_type
|
|
return False
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Fake reconnect to DUT'
|
|
|
|
|
|
class _PowerDeliveryRepair(hosts.RepairAction):
|
|
"""Repair to check servo_v4_role for servos that support
|
|
power delivery feature(a.k.a power pass through).
|
|
|
|
There are currently two position of servo_v4_role, src and snk:
|
|
src -- servo in power delivery mode and passes power to the DUT.
|
|
snk -- servo in normal mode and not passes power to DUT.
|
|
"""
|
|
# How many time retry to set PD in correct mode and verify that is stay.
|
|
# Set 5 as each attempt has 10 attempts inside 'set' method.
|
|
_SET_ATTEMPT_COUNT = 5
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
host.get_servo().set_nocheck('servo_pd_role', 'snk')
|
|
time.sleep(1)
|
|
for x in range(self._SET_ATTEMPT_COUNT):
|
|
logging.debug('Try set servo_v4_role to src.'
|
|
' Attempt: %s', x + 1)
|
|
try:
|
|
host.get_servo().set('servo_pd_role', 'src')
|
|
# Waiting a few seconds as it can be change to snk if PD
|
|
# on servo has issue.
|
|
time.sleep(5)
|
|
except BaseException as e:
|
|
logging.debug('Setting PD with retries failed %s', e)
|
|
if host.get_servo().get('servo_pd_role') == 'src':
|
|
break
|
|
if host.get_servo().get('servo_pd_role') == 'snk':
|
|
raise hosts.AutoservNonCriticalVerifyError(
|
|
'Cannot switch power delivery to the src role')
|
|
# Restart servod to re-initialize servos.
|
|
# In some cases if device did not receive power can block detection
|
|
# of servo components.
|
|
host.restart_servod()
|
|
|
|
def _is_type_c(self, host):
|
|
return (host.is_in_lab() and host.get_servo()
|
|
and host.get_servo().supports_built_in_pd_control())
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Recover power delivery on servo'
|
|
|
|
|
|
class _ECRebootRepair(hosts.RepairAction):
|
|
"""
|
|
Reboot EC on DUT from servo.
|
|
"""
|
|
|
|
def _is_applicable(self, host):
|
|
return (not host.is_localhost()) and host.is_ec_supported()
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
host.get_servo().ec_reboot()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Reboot EC'
|
|
|
|
|
|
class _DutRebootRepair(hosts.RepairAction):
|
|
"""
|
|
Reboot DUT to recover some servo controls depending on EC console.
|
|
|
|
Some servo controls, like lid_open, requires communicating with DUT through
|
|
EC UART console. Failure of this kinds of controls can be recovered by
|
|
rebooting the DUT.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
host.get_servo().get_power_state_controller().reset()
|
|
# Get the lid_open value which requires EC console.
|
|
lid_open = host.get_servo().get('lid_open')
|
|
if lid_open != 'yes' and lid_open != 'not_applicable':
|
|
raise hosts.AutoservVerifyError(
|
|
'Still fail to contact EC console after rebooting DUT')
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Reset the DUT via servo'
|
|
|
|
|
|
class _DiskCleanupRepair(hosts.RepairAction):
|
|
"""
|
|
Remove old logs/metrics/crash_dumps on servohost to free up disk space.
|
|
"""
|
|
KEEP_LOGS_MAX_DAYS = 5
|
|
|
|
FILE_TO_REMOVE = [
|
|
'/var/lib/metrics/uma-events', '/var/spool/crash/*',
|
|
'/var/log/chrome/*', '/var/log/ui/*',
|
|
'/home/chronos/BrowserMetrics/*'
|
|
]
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.SHORT_REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
if host.is_localhost():
|
|
# we don't want to remove anything from local testing.
|
|
return
|
|
|
|
# Remove old servod logs.
|
|
host.run('/usr/bin/find /var/log/servod_* -mtime +%d -print -delete'
|
|
% self.KEEP_LOGS_MAX_DAYS, ignore_status=True)
|
|
|
|
# Remove pre-defined metrics and crash dumps.
|
|
for path in self.FILE_TO_REMOVE:
|
|
host.run('rm %s' % path, ignore_status=True)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Clean up old logs/metrics on servohost to free up disk space.'
|
|
|
|
|
|
class _ServoFwUpdateRepair(hosts.RepairAction):
|
|
"""Update firmware for servos.
|
|
|
|
We try to update servo 3 times and then try to force update it.
|
|
"""
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
try:
|
|
servo_updater.update_servo_firmware(host,
|
|
try_attempt_count=3,
|
|
force_update=False,
|
|
try_force_update=True)
|
|
except servo_updater.ServoUpdaterError as er:
|
|
# Catch servo_updater issue to cache it.
|
|
self.servo_updater_issue_detected = True
|
|
raise hosts.AutoservVerifyError('ServoUpdater issue detected')
|
|
|
|
def _is_applicable(self, host):
|
|
# Run only for servo_v4 and servo_v4p1.
|
|
return host.is_labstation() or host.is_containerized_servod()
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Update servo-fw if required.'
|
|
|
|
|
|
class _ServoMicroFlashRepair(hosts.RepairAction):
|
|
"""
|
|
Remove old logs/metrics/crash_dumps on servohost to free up disk space.
|
|
"""
|
|
_TARGET_SERVO = 'servo_micro'
|
|
|
|
@timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC)
|
|
def repair(self, host):
|
|
if not host.is_cros_host():
|
|
raise hosts.AutoservRepairError(
|
|
'Can\'t restart servod: not running '
|
|
'embedded ChromeOS.',
|
|
'servo_not_applicable_to_non_cros_host')
|
|
servo = host.get_servo()
|
|
if not servo or self._TARGET_SERVO not in servo.get_servo_type():
|
|
logging.info("Servo-micro is not present on set-up")
|
|
return
|
|
|
|
try:
|
|
servo_updater.update_servo_firmware(host,
|
|
boards=(self._TARGET_SERVO, ),
|
|
force_update=True,
|
|
ignore_version=True)
|
|
except Exception as e:
|
|
logging.debug("(Not critical) Servo device update error: %s", e)
|
|
raise hosts.AutoservVerifyError(
|
|
'Still fail to contact EC console after rebooting DUT')
|
|
# Update time when we reflashed the fw on the device
|
|
dhp = host.get_dut_health_profile()
|
|
dhp.refresh_servo_miro_fw_update_run_time()
|
|
host.restart_servod()
|
|
|
|
def is_time_to_try(self, dhp):
|
|
"""Verify that it is time when we can try to re-flash fw on servo_micro.
|
|
|
|
Re-flashing limited to once per 2 weeks to avoid over-flashing
|
|
the servo device.
|
|
"""
|
|
today_time = int(time.time())
|
|
last_check = dhp.get_servo_micro_fw_update_time_epoch()
|
|
can_run = today_time > (last_check + (14 * 24 * 60 * 60))
|
|
if not can_run:
|
|
logging.info("The servo_micro fw updated in las 2 weeks ago.")
|
|
return can_run
|
|
|
|
def _is_applicable(self, host):
|
|
return (not host.is_localhost() and host.get_dut_health_profile()
|
|
and self.is_time_to_try(host.get_dut_health_profile()))
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Re-flash servo_micro firmware.'
|
|
|
|
|
|
def _servo_verifier_actions():
|
|
"""
|
|
Return a verifiers for a `ServoHost`.
|
|
"""
|
|
return (
|
|
(_ConnectionVerifier, 'connection', []),
|
|
(_RootServoPresentVerifier, 'servo_root_present', ['connection']),
|
|
(_RootServoV3PresentVerifier, 'servo_v3_root_present',
|
|
['connection']),
|
|
(_ServoFwVerifier, 'servo_fw', ['servo_root_present']),
|
|
(_StartServodVerifier, 'start_servod',
|
|
['servo_fw', 'servo_v3_root_present']),
|
|
(_DiskSpaceVerifier, 'servo_disk_space', ['connection']),
|
|
(_UpdateVerifier, 'servo_update', ['servo_v3_root_present']),
|
|
(_BoardConfigVerifier, 'servo_config_board', ['connection']),
|
|
(_SerialConfigVerifier, 'servo_config_serial', ['connection']),
|
|
(_ServodJobVerifier, 'servod_started', [
|
|
'start_servod', 'servo_config_board',
|
|
'servo_config_serial', 'servo_disk_space'
|
|
]),
|
|
(_ServodEchoVerifier, 'servod_echo', ['servod_started']),
|
|
(_TopologyVerifier, 'servo_topology', ['servod_echo']),
|
|
(_ServodConnectionVerifier, 'servod_connection', ['servod_echo']),
|
|
(_Cr50LowSBUVerifier, 'servo_cr50_low_sbu', ['servod_connection']),
|
|
(ServodDutControllerMissingVerifier,
|
|
'servod_dut_controller_missing', ['servod_connection']),
|
|
(_Cr50OffVerifier, 'servo_cr50_off', ['servod_connection']),
|
|
(_ServodControlVerifier, 'servod_control', ['servod_connection']),
|
|
(_DUTConnectionVerifier, 'servo_dut_connected',
|
|
['servod_connection']),
|
|
(_ServoHubConnectionVerifier, 'servo_hub_connected',
|
|
['servo_dut_connected']),
|
|
(_PowerButtonVerifier, 'servo_pwr_button', ['servo_hub_connected'
|
|
]),
|
|
(_BatteryVerifier, 'servo_battery', ['servo_hub_connected']),
|
|
(_LidVerifier, 'servo_lid_open', ['servo_hub_connected']),
|
|
(ECConsoleVerifier, 'servo_ec_console', ['servo_dut_connected']),
|
|
(_Cr50ConsoleVerifier, 'servo_cr50_console',
|
|
['servo_dut_connected']),
|
|
(_CCDTestlabVerifier, 'servo_ccd_testlab', ['servo_cr50_console']),
|
|
(_CCDPowerDeliveryVerifier, 'servo_power_delivery',
|
|
['servod_connection']),
|
|
)
|
|
|
|
|
|
def _servo_repair_actions():
|
|
"""
|
|
Return a `RepairStrategy` for a `ServoHost`.
|
|
"""
|
|
config = ['servo_config_board', 'servo_config_serial', 'start_servod']
|
|
base_triggers = [
|
|
'servod_started', 'servo_topology', 'servod_connection',
|
|
'servod_echo', 'servod_control', 'servo_dut_connected',
|
|
'servo_hub_connected', 'servo_pwr_button', 'servo_cr50_console',
|
|
'servo_cr50_low_sbu', 'servo_cr50_off', 'servo_power_delivery',
|
|
'servod_dut_controller_missing'
|
|
]
|
|
dut_triggers = [
|
|
'servod_control', 'servo_lid_open', 'servo_ec_console',
|
|
'servo_topology', 'servo_dut_connected', 'servo_hub_connected',
|
|
'servo_cr50_low_sbu', 'servo_cr50_off', 'servo_cr50_console',
|
|
'servo_power_delivery', 'servod_dut_controller_missing'
|
|
]
|
|
reboot_triggers = [
|
|
'servo_topology', 'servo_root_present', 'servo_disk_space',
|
|
'servo_power_delivery'
|
|
]
|
|
return (
|
|
(_ServoFwUpdateRepair, 'servo_fw_update', ['connection'],
|
|
['servo_fw']),
|
|
(_DiskCleanupRepair, 'servo_disk_cleanup', ['connection'],
|
|
['servo_disk_space']),
|
|
(_ServoMicroFlashRepair, 'servo_micro_flash',
|
|
['connection', 'servo_topology'], ['servo_dut_connected']),
|
|
(_RestartServod, 'servod_restart', ['connection', 'servo_fw'],
|
|
config + base_triggers),
|
|
(_ServoRebootRepair, 'servo_reboot', ['connection'],
|
|
reboot_triggers),
|
|
(_PowerDeliveryRepair, 'servo_pd_recover', ['servod_connection'],
|
|
base_triggers),
|
|
(_FakedisconnectRepair, 'servo_fakedisconnect',
|
|
['servod_connection'], base_triggers),
|
|
(_ToggleCCLineRepair, 'servo_cc', ['servod_connection'],
|
|
base_triggers),
|
|
(_DutRebootRepair, 'servo_dut_reboot', ['servod_connection'],
|
|
dut_triggers),
|
|
(_ECRebootRepair, 'servo_ec_reboot', ['servod_connection'],
|
|
dut_triggers),
|
|
)
|
|
|
|
|
|
def create_servo_repair_strategy():
|
|
"""
|
|
Return a `RepairStrategy` for a `ServoHost`.
|
|
"""
|
|
return hosts.RepairStrategy(_servo_verifier_actions(),
|
|
_servo_repair_actions(), 'servo')
|