980 lines
41 KiB
Python
980 lines
41 KiB
Python
#!/usr/bin/env python3.4
|
|
#
|
|
# Copyright 2017 - The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import collections
|
|
import csv
|
|
import itertools
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import re
|
|
import scipy.stats
|
|
import time
|
|
from acts import asserts
|
|
from acts import context
|
|
from acts import base_test
|
|
from acts import utils
|
|
from acts.controllers.utils_lib import ssh
|
|
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
|
|
from acts_contrib.test_utils.wifi import ota_sniffer
|
|
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
|
|
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
|
|
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
|
|
from functools import partial
|
|
|
|
|
|
class WifiTxPowerCheckTest(base_test.BaseTestClass):
|
|
"""Class for ping-based Wifi performance tests.
|
|
|
|
This class implements WiFi ping performance tests such as range and RTT.
|
|
The class setups up the AP in the desired configurations, configures
|
|
and connects the phone to the AP, and runs For an example config file to
|
|
run this test class see example_connectivity_performance_ap_sta.json.
|
|
"""
|
|
|
|
TEST_TIMEOUT = 10
|
|
RSSI_POLL_INTERVAL = 0.2
|
|
SHORT_SLEEP = 1
|
|
MED_SLEEP = 5
|
|
MAX_CONSECUTIVE_ZEROS = 5
|
|
DISCONNECTED_PING_RESULT = {
|
|
'connected': 0,
|
|
'rtt': [],
|
|
'time_stamp': [],
|
|
'ping_interarrivals': [],
|
|
'packet_loss_percentage': 100
|
|
}
|
|
|
|
BRCM_SAR_MAPPING = {
|
|
0: 'disable',
|
|
1: 'head',
|
|
2: 'grip',
|
|
16: 'bt',
|
|
32: 'hotspot'
|
|
}
|
|
|
|
BAND_TO_CHANNEL_MAP = {
|
|
('2g', 1): [1, 6, 11],
|
|
('5g', 1): [36, 40, 44, 48],
|
|
('5g', 2): [52, 56, 60, 64],
|
|
('5g', 3): range(100, 148, 4),
|
|
('5g', 4): [149, 153, 157, 161],
|
|
('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)],
|
|
('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)],
|
|
('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)],
|
|
('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)],
|
|
('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)],
|
|
('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)]
|
|
}
|
|
|
|
def __init__(self, controllers):
|
|
base_test.BaseTestClass.__init__(self, controllers)
|
|
self.testcase_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_case())
|
|
self.testclass_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_class())
|
|
self.publish_testcase_metrics = True
|
|
self.tests = self.generate_test_cases(
|
|
ap_power='standard',
|
|
channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'],
|
|
modes=['bw20', 'bw40', 'bw80', 'bw160'],
|
|
test_types=[
|
|
'test_tx_power',
|
|
],
|
|
country_codes=['US', 'GB', 'JP', 'CA', 'AU'],
|
|
sar_states=range(-1, 13))
|
|
|
|
def setup_class(self):
|
|
self.dut = self.android_devices[-1]
|
|
req_params = [
|
|
'tx_power_test_params', 'testbed_params', 'main_network',
|
|
'RetailAccessPoints', 'RemoteServer'
|
|
]
|
|
opt_params = ['OTASniffer']
|
|
self.unpack_userparams(req_params, opt_params)
|
|
self.testclass_params = self.tx_power_test_params
|
|
self.num_atten = self.attenuators[0].instrument.num_atten
|
|
self.ping_server = ssh.connection.SshConnection(
|
|
ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
|
|
self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
|
|
if hasattr(self,
|
|
'OTASniffer') and self.testbed_params['sniffer_enable']:
|
|
try:
|
|
self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
|
|
except:
|
|
self.log.warning('Could not start sniffer. Disabling sniffs.')
|
|
self.testbed_params['sniffer_enable'] = 0
|
|
self.log.info('Access Point Configuration: {}'.format(
|
|
self.access_point.ap_settings))
|
|
self.log_path = os.path.join(logging.log_path, 'results')
|
|
os.makedirs(self.log_path, exist_ok=True)
|
|
self.atten_dut_chain_map = {}
|
|
self.testclass_results = []
|
|
|
|
# Turn WiFi ON
|
|
if self.testclass_params.get('airplane_mode', 1):
|
|
self.log.info('Turning on airplane mode.')
|
|
asserts.assert_true(utils.force_airplane_mode(self.dut, True),
|
|
'Can not turn on airplane mode.')
|
|
wutils.wifi_toggle_state(self.dut, True)
|
|
self.dut.droid.wifiEnableVerboseLogging(1)
|
|
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
|
|
"Failed to enable WiFi verbose logging.")
|
|
|
|
# decode nvram
|
|
self.nvram_sar_data = self.read_nvram_sar_data()
|
|
self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv'])
|
|
|
|
# Configure test retries
|
|
self.user_params['retry_tests'] = [self.__class__.__name__]
|
|
|
|
def teardown_class(self):
|
|
# Turn WiFi OFF and reset AP
|
|
self.access_point.teardown()
|
|
for dev in self.android_devices:
|
|
wutils.wifi_toggle_state(dev, False)
|
|
dev.go_to_sleep()
|
|
self.process_testclass_results()
|
|
|
|
def setup_test(self):
|
|
self.retry_flag = False
|
|
|
|
def teardown_test(self):
|
|
self.retry_flag = False
|
|
|
|
def on_retry(self):
|
|
"""Function to control test logic on retried tests.
|
|
|
|
This function is automatically executed on tests that are being
|
|
retried. In this case the function resets wifi, toggles it off and on
|
|
and sets a retry_flag to enable further tweaking the test logic on
|
|
second attempts.
|
|
"""
|
|
self.retry_flag = True
|
|
for dev in self.android_devices:
|
|
wutils.reset_wifi(dev)
|
|
wutils.toggle_wifi_off_and_on(dev)
|
|
|
|
def read_sar_csv(self, sar_csv):
|
|
"""Reads SAR powers from CSV.
|
|
|
|
This function reads SAR powers from a CSV and generate a dictionary
|
|
with all programmed TX powers on a per band and regulatory domain
|
|
basis.
|
|
|
|
Args:
|
|
sar_csv: path to SAR data file.
|
|
Returns:
|
|
sar_powers: dict containing all SAR data
|
|
"""
|
|
|
|
sar_powers = {}
|
|
sar_csv_data = []
|
|
with open(sar_csv, mode='r') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
row['Sub-band Powers'] = [
|
|
float(val) for key, val in row.items()
|
|
if 'Sub-band' in key and val != ''
|
|
]
|
|
sar_csv_data.append(row)
|
|
|
|
for row in sar_csv_data:
|
|
sar_powers.setdefault(int(row['Scenario Index']), {})
|
|
sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {})
|
|
sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band'])
|
|
sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault(
|
|
sar_row_key, {})
|
|
sar_powers[int(
|
|
row['Scenario Index'])]['SAR Powers'][sar_row_key][int(
|
|
row['Chain'])] = row['Sub-band Powers']
|
|
return sar_powers
|
|
|
|
def read_nvram_sar_data(self):
|
|
"""Reads SAR powers from NVRAM.
|
|
|
|
This function reads SAR powers from the NVRAM found on the DUT and
|
|
generates a dictionary with all programmed TX powers on a per band and
|
|
regulatory domain basis. NThe NVRAM file is chosen based on the build,
|
|
but if no NVRAM file is found matching the expected name, the default
|
|
NVRAM will be loaded. The choice of NVRAM is not guaranteed to be
|
|
correct.
|
|
|
|
Returns:
|
|
nvram_sar_data: dict containing all SAR data
|
|
"""
|
|
|
|
self._read_sar_config_info()
|
|
try:
|
|
hardware_version = self.dut.adb.shell(
|
|
'getprop ro.boot.hardware.revision')
|
|
nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format(
|
|
hardware_version)
|
|
nvram = self.dut.adb.shell('cat {}'.format(nvram_path))
|
|
except:
|
|
nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal')
|
|
current_context = context.get_current_context().get_full_output_path()
|
|
file_path = os.path.join(current_context, 'nvram_file')
|
|
with open(file_path, 'w') as file:
|
|
file.write(nvram)
|
|
nvram_sar_data = {}
|
|
for line in nvram.splitlines():
|
|
if 'dynsar' in line:
|
|
sar_config, sar_powers = self._parse_nvram_sar_line(line)
|
|
nvram_sar_data[sar_config] = sar_powers
|
|
file_path = os.path.join(current_context, 'nvram_sar_data')
|
|
with open(file_path, 'w') as file:
|
|
json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4)
|
|
|
|
return nvram_sar_data
|
|
|
|
def _read_sar_config_info(self):
|
|
"""Function to read SAR scenario mapping,
|
|
|
|
This function reads sar_config.info file which contains the mapping
|
|
of SAR scenarios to NVRAM data tables.
|
|
"""
|
|
|
|
self.sar_state_mapping = collections.OrderedDict([(-2, {
|
|
"google_name":
|
|
'WIFI_POWER_SCENARIO_INVALID'
|
|
}), (-1, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_DISABLE'
|
|
}), (0, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_VOICE_CALL'
|
|
}), (1, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF'
|
|
}), (2, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON'
|
|
}), (3, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF'
|
|
}), (4, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON'
|
|
}), (5, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT'
|
|
}), (6, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT'
|
|
}), (7, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW'
|
|
}), (8, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT'
|
|
}), (9, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT'
|
|
}), (10, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT'
|
|
}), (11, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW'
|
|
}), (12, {
|
|
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW'
|
|
})])
|
|
sar_config_path = '/vendor/firmware/sarconfig.info'
|
|
sar_config = self.dut.adb.shell(
|
|
'cat {}'.format(sar_config_path)).splitlines()
|
|
sar_config = [line.split(',') for line in sar_config]
|
|
sar_config = [[int(x) for x in line] for line in sar_config]
|
|
|
|
for sar_state in sar_config:
|
|
self.sar_state_mapping[sar_state[0]]['brcm_index'] = (
|
|
self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2]))
|
|
current_context = context.get_current_context().get_full_output_path()
|
|
file_path = os.path.join(current_context, 'sarconfig')
|
|
with open(file_path, 'w') as file:
|
|
json.dump(wputils.serialize_dict(self.sar_state_mapping),
|
|
file,
|
|
indent=4)
|
|
|
|
def _parse_nvram_sar_line(self, sar_line):
|
|
"""Helper function to decode SAR NVRAM data lines.
|
|
|
|
Args:
|
|
sar_line: single line of text from NVRAM file containing SAR data.
|
|
Returns:
|
|
sar_config: sar config referenced in this line
|
|
decoded_values: tx powers configured in this line
|
|
"""
|
|
|
|
sar_config = collections.OrderedDict()
|
|
list_of_countries = ['fcc', 'jp', 'ca']
|
|
try:
|
|
sar_config['country'] = next(country
|
|
for country in list_of_countries
|
|
if country in sar_line.split('=')[0])
|
|
except:
|
|
sar_config['country'] = 'row'
|
|
|
|
list_of_sar_states = ['grip', 'bt', 'hotspot']
|
|
try:
|
|
sar_config['state'] = next(state for state in list_of_sar_states
|
|
if state in sar_line.split('=')[0])
|
|
except:
|
|
sar_config['state'] = 'head'
|
|
|
|
list_of_bands = ['2g', '5g', '6g']
|
|
sar_config['band'] = next(band for band in list_of_bands
|
|
if band in sar_line.split('=')[0])
|
|
|
|
sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo'
|
|
sar_config['airplane_mode'] = '_2=' in sar_line
|
|
|
|
sar_powers = sar_line.split('=')[1].split(',')
|
|
decoded_powers = []
|
|
for sar_power in sar_powers:
|
|
# Note that core 0 and 1 are flipped in the NVRAM entries
|
|
decoded_powers.append([
|
|
(int(sar_power[4:], 16) & int('7f', 16)) / 4,
|
|
(int(sar_power[2:4], 16) & int('7f', 16)) / 4
|
|
])
|
|
|
|
return tuple(sar_config.values()), decoded_powers
|
|
|
|
def get_sar_power_from_nvram(self, testcase_params):
|
|
"""Function to get current expected SAR power from nvram
|
|
|
|
This functions gets the expected SAR TX power from the DUT NVRAM data.
|
|
The SAR power is looked up based on the current channel and regulatory
|
|
domain,
|
|
|
|
Args:
|
|
testcase_params: dict containing channel, sar state, country code
|
|
Returns:
|
|
sar_config: current expected sar config
|
|
sar_powers: current expected sar powers
|
|
"""
|
|
|
|
if testcase_params['country_code'] == 'US':
|
|
reg_domain = 'fcc'
|
|
elif testcase_params['country_code'] == 'JP':
|
|
reg_domain = 'jp'
|
|
elif testcase_params['country_code'] == 'CA':
|
|
reg_domain = 'ca'
|
|
else:
|
|
reg_domain = 'row'
|
|
for band, channels in self.BAND_TO_CHANNEL_MAP.items():
|
|
if testcase_params['channel'] in channels:
|
|
current_band = band[0]
|
|
sub_band_idx = band[1]
|
|
break
|
|
sar_config = (reg_domain, self.sar_state_mapping[
|
|
testcase_params['sar_state']]['brcm_index'][0], current_band,
|
|
'mimo', self.sar_state_mapping[
|
|
testcase_params['sar_state']]['brcm_index'][1])
|
|
sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1]
|
|
return sar_config, sar_powers
|
|
|
|
def get_sar_power_from_csv(self, testcase_params):
|
|
"""Function to get current expected SAR power from CSV.
|
|
|
|
This functions gets the expected SAR TX power from the DUT NVRAM data.
|
|
The SAR power is looked up based on the current channel and regulatory
|
|
domain,
|
|
|
|
Args:
|
|
testcase_params: dict containing channel, sar state, country code
|
|
Returns:
|
|
sar_config: current expected sar config
|
|
sar_powers: current expected sar powers
|
|
"""
|
|
|
|
if testcase_params['country_code'] == 'US':
|
|
reg_domain = 'fcc'
|
|
elif testcase_params['country_code'] == 'JP':
|
|
reg_domain = 'jp'
|
|
elif testcase_params['country_code'] == 'CA':
|
|
reg_domain = 'ca'
|
|
else:
|
|
reg_domain = 'row'
|
|
for band, channels in self.BAND_TO_CHANNEL_MAP.items():
|
|
if testcase_params['channel'] in channels:
|
|
current_band = band[0]
|
|
sub_band_idx = band[1]
|
|
break
|
|
sar_config = (reg_domain, 'mimo', current_band)
|
|
sar_powers = [
|
|
self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
|
|
[sar_config][0][sub_band_idx - 1],
|
|
self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
|
|
[sar_config][1][sub_band_idx - 1]
|
|
]
|
|
return sar_config, sar_powers
|
|
|
|
def process_wl_curpower(self, wl_curpower_file, testcase_params):
|
|
"""Function to parse wl_curpower output.
|
|
|
|
Args:
|
|
wl_curpower_file: path to curpower output file.
|
|
testcase_params: dict containing channel, sar state, country code
|
|
Returns:
|
|
wl_curpower_dict: dict formatted version of curpower data.
|
|
"""
|
|
|
|
with open(wl_curpower_file, 'r') as file:
|
|
wl_curpower_out = file.read()
|
|
|
|
channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)')
|
|
bandwidth_regex = re.compile(
|
|
r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n')
|
|
|
|
channel = int(
|
|
re.search(channel_regex, wl_curpower_out).group('channel'))
|
|
bandwidth = int(
|
|
re.search(bandwidth_regex, wl_curpower_out).group('bandwidth'))
|
|
|
|
regulatory_limits = self.generate_regulatory_table(
|
|
wl_curpower_out, channel, bandwidth)
|
|
board_limits = self.generate_board_limit_table(wl_curpower_out,
|
|
channel, bandwidth)
|
|
wl_curpower_dict = {
|
|
'channel': channel,
|
|
'bandwidth': bandwidth,
|
|
'country': testcase_params['country_code'],
|
|
'regulatory_limits': regulatory_limits,
|
|
'board_limits': board_limits
|
|
}
|
|
return wl_curpower_dict
|
|
|
|
def generate_regulatory_table(self, wl_curpower_out, channel, bw):
|
|
""""Helper function to generate regulatory limit table from curpower.
|
|
|
|
Args:
|
|
wl_curpower_out: curpower output
|
|
channel: current channel
|
|
bw: current bandwidth
|
|
Returns:
|
|
regulatory_table: dict with regulatory limits for current config
|
|
"""
|
|
|
|
regulatory_group_map = {
|
|
'DSSS':
|
|
[('CCK', rate, 1)
|
|
for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]],
|
|
'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [
|
|
'{}Mbps'.format(mbps)
|
|
for mbps in [6, 9, 12, 18, 24, 36, 48, 54]
|
|
]],
|
|
'MCS0_7_CDD1':
|
|
[(mode, rate, 1)
|
|
for (mode,
|
|
rate) in itertools.product(['HT' + str(bw), 'VHT' +
|
|
str(bw)], range(0, 8))],
|
|
'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1),
|
|
('VHT' + str(bw), 9, 1)],
|
|
'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1),
|
|
('VHT' + str(bw), 11, 1)],
|
|
'MCS8_15':
|
|
[(mode, rate - 8 * ('VHT' in mode), 2)
|
|
for (mode,
|
|
rate) in itertools.product(['HT' + str(bw), 'VHT' +
|
|
str(bw)], range(8, 16))],
|
|
'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)],
|
|
'VHT10_11SS2': [('VHT' + str(bw), 10, 2),
|
|
('VHT' + str(bw), 11, 2)],
|
|
'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1)
|
|
for rate in range(0, 12)],
|
|
'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2)
|
|
for rate in range(0, 12)],
|
|
}
|
|
tx_power_regex = re.compile(
|
|
'(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
|
|
)
|
|
|
|
regulatory_section_regex = re.compile(
|
|
r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:')
|
|
regulatory_list = re.search(regulatory_section_regex,
|
|
wl_curpower_out).group('regulatory_limits')
|
|
regulatory_list = re.findall(tx_power_regex, regulatory_list)
|
|
regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list}
|
|
|
|
bw_index = int(math.log(bw / 10, 2)) - 1
|
|
regulatory_table = collections.OrderedDict()
|
|
for regulatory_group, rates in regulatory_group_map.items():
|
|
for rate in rates:
|
|
reg_power = regulatory_dict.get(regulatory_group,
|
|
['0', '0', '0', '0'])[bw_index]
|
|
regulatory_table[rate] = float(
|
|
reg_power) if reg_power != '-' else 0
|
|
return regulatory_table
|
|
|
|
def generate_board_limit_table(self, wl_curpower_out, channel, bw):
|
|
""""Helper function to generate board limit table from curpower.
|
|
|
|
Args:
|
|
wl_curpower_out: curpower output
|
|
channel: current channel
|
|
bw: current bandwidth
|
|
Returns:
|
|
board_limit_table: dict with board limits for current config
|
|
"""
|
|
|
|
tx_power_regex = re.compile(
|
|
'(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
|
|
)
|
|
|
|
board_section_regex = re.compile(
|
|
r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:')
|
|
board_limits_list = re.search(board_section_regex,
|
|
wl_curpower_out).group('board_limits')
|
|
board_limits_list = re.findall(tx_power_regex, board_limits_list)
|
|
board_limits_dict = {
|
|
entry[0]: entry[2:]
|
|
for entry in board_limits_list
|
|
}
|
|
|
|
mcs_regex_list = [[
|
|
re.compile('DSSS'),
|
|
[('CCK', rate, 1)
|
|
for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]]
|
|
], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]],
|
|
[
|
|
re.compile('MCS(?P<mcs>[0-7])_CDD1'),
|
|
[('HT{}'.format(bw), '{}', 1),
|
|
('VHT{}'.format(bw), '{}', 1)]
|
|
],
|
|
[
|
|
re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'),
|
|
[('VHT{}'.format(bw), '{}', 1)]
|
|
],
|
|
[
|
|
re.compile('VHT10_11SS1_CDD1'),
|
|
[('VHT{}'.format(bw), '10', 1),
|
|
('VHT{}'.format(bw), '11', 1)]
|
|
],
|
|
[
|
|
re.compile('MCS(?P<mcs>[0-9]{2})'),
|
|
[('HT{}'.format(bw), '{}', 2)]
|
|
],
|
|
[
|
|
re.compile('VHT(?P<mcs>[0-9])SS2'),
|
|
[('VHT{}'.format(bw), '{}', 2)]
|
|
],
|
|
[
|
|
re.compile('VHT10_11SS2'),
|
|
[('VHT{}'.format(bw), '10', 2),
|
|
('VHT{}'.format(bw), '11', 2)]
|
|
],
|
|
[
|
|
re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'),
|
|
[('HE{}'.format(bw), '{}', 1)]
|
|
],
|
|
[
|
|
re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'),
|
|
[('HE{}'.format(bw), '{}', 2)]
|
|
]]
|
|
|
|
bw_index = int(math.log(bw / 10, 2)) - 1
|
|
board_limit_table = collections.OrderedDict()
|
|
for mcs, board_limit in board_limits_dict.items():
|
|
for mcs_regex_tuple in mcs_regex_list:
|
|
mcs_match = re.match(mcs_regex_tuple[0], mcs)
|
|
if mcs_match:
|
|
for possible_mcs in mcs_regex_tuple[1]:
|
|
try:
|
|
curr_mcs = (possible_mcs[0],
|
|
possible_mcs[1].format(
|
|
mcs_match.group('mcs')),
|
|
possible_mcs[2])
|
|
except:
|
|
curr_mcs = (possible_mcs[0], possible_mcs[1],
|
|
possible_mcs[2])
|
|
board_limit_table[curr_mcs] = float(
|
|
board_limit[bw_index]
|
|
) if board_limit[bw_index] != '-' else 0
|
|
break
|
|
return board_limit_table
|
|
|
|
def pass_fail_check(self, result):
|
|
"""Function to evaluate if current TX powqe matches CSV/NVRAM settings.
|
|
|
|
This function assesses whether the current TX power reported by the
|
|
DUT matches the powers programmed in NVRAM and CSV after applying the
|
|
correct TX power backoff used to account for CLPC errors.
|
|
"""
|
|
|
|
if isinstance(result['testcase_params']['channel'],
|
|
str) and '6g' in result['testcase_params']['channel']:
|
|
mode = 'HE' + str(result['testcase_params']['bandwidth'])
|
|
else:
|
|
mode = 'HE' + str(result['testcase_params']['bandwidth'])
|
|
regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0,
|
|
2)]
|
|
board_power = result['wl_curpower']['board_limits'][(mode, str(0), 2)]
|
|
# try:
|
|
sar_config, nvram_powers = self.get_sar_power_from_nvram(
|
|
result['testcase_params'])
|
|
# except:
|
|
# nvram_powers = [99, 99]
|
|
# sar_config = 'SAR DISABLED'
|
|
try:
|
|
csv_config, csv_powers = self.get_sar_power_from_csv(
|
|
result['testcase_params'])
|
|
except:
|
|
#get from wl_curpower
|
|
csv_powers = [99, 99]
|
|
self.log.info("SAR state: {} ({})".format(
|
|
result['testcase_params']['sar_state'],
|
|
self.sar_state_mapping[result['testcase_params']['sar_state']],
|
|
))
|
|
self.log.info("Country Code: {}".format(
|
|
result['testcase_params']['country_code']))
|
|
self.log.info('BRCM SAR Table: {}'.format(sar_config))
|
|
expected_power = [
|
|
min([csv_powers[0], regulatory_power, board_power]) - 1.5,
|
|
min([csv_powers[1], regulatory_power, board_power]) - 1.5
|
|
]
|
|
power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Board Power: {}, Expected Powers: {}, Reported Powers: {}".format(
|
|
nvram_powers, csv_powers, [regulatory_power] * 2,
|
|
[board_power] * 2, expected_power, result['tx_powers'])
|
|
max_error = max([
|
|
abs(expected_power[idx] - result['tx_powers'][idx])
|
|
for idx in [0, 1]
|
|
])
|
|
if max_error > 1:
|
|
asserts.fail(power_str)
|
|
else:
|
|
asserts.explicit_pass(power_str)
|
|
|
|
def process_testclass_results(self):
|
|
pass
|
|
|
|
def run_tx_power_test(self, testcase_params):
|
|
"""Main function to test tx power.
|
|
|
|
The function sets up the AP & DUT in the correct channel and mode
|
|
configuration, starts ping traffic and queries the current TX power.
|
|
|
|
Args:
|
|
testcase_params: dict containing all test parameters
|
|
Returns:
|
|
test_result: dict containing ping results and other meta data
|
|
"""
|
|
# Prepare results dict
|
|
llstats_obj = wputils.LinkLayerStats(
|
|
self.dut, self.testclass_params.get('llstats_enabled', True))
|
|
test_result = collections.OrderedDict()
|
|
test_result['testcase_params'] = testcase_params.copy()
|
|
test_result['test_name'] = self.current_test_name
|
|
test_result['ap_config'] = self.access_point.ap_settings.copy()
|
|
test_result['attenuation'] = testcase_params['atten_range']
|
|
test_result['fixed_attenuation'] = self.testbed_params[
|
|
'fixed_attenuation'][str(testcase_params['channel'])]
|
|
test_result['rssi_results'] = []
|
|
test_result['ping_results'] = []
|
|
test_result['llstats'] = []
|
|
# Setup sniffer
|
|
if self.testbed_params['sniffer_enable']:
|
|
self.sniffer.start_capture(
|
|
testcase_params['test_network'],
|
|
chan=testcase_params['channel'],
|
|
bw=testcase_params['bandwidth'],
|
|
duration=testcase_params['ping_duration'] *
|
|
len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
|
|
# Set sar state
|
|
if testcase_params['sar_state'] == -1:
|
|
self.dut.adb.shell('halutil -sar disable')
|
|
else:
|
|
self.dut.adb.shell('halutil -sar enable {}'.format(
|
|
testcase_params['sar_state']))
|
|
# Run ping and sweep attenuation as needed
|
|
self.log.info('Starting ping.')
|
|
thread_future = wputils.get_ping_stats_nb(self.ping_server,
|
|
self.dut_ip, 10, 0.02, 64)
|
|
|
|
for atten in testcase_params['atten_range']:
|
|
for attenuator in self.attenuators:
|
|
attenuator.set_atten(atten, strict=False, retry=True)
|
|
# Set mcs
|
|
if isinstance(testcase_params['channel'],
|
|
int) and testcase_params['channel'] < 13:
|
|
self.dut.adb.shell('wl 2g_rate -e 0 -s 2 -b {}'.format(
|
|
testcase_params['bandwidth']))
|
|
elif isinstance(testcase_params['channel'],
|
|
int) and testcase_params['channel'] > 13:
|
|
self.dut.adb.shell('wl 5g_rate -e 0 -s 2 -b {}'.format(
|
|
testcase_params['bandwidth']))
|
|
else:
|
|
self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format(
|
|
testcase_params['bandwidth']))
|
|
# Refresh link layer stats
|
|
llstats_obj.update_stats()
|
|
# Check sar state
|
|
self.log.info('Current Country: {}'.format(
|
|
self.dut.adb.shell('wl country')))
|
|
# Dump last est power multiple times
|
|
chain_0_power = []
|
|
chain_1_power = []
|
|
for idx in range(30):
|
|
last_est_out = self.dut.adb.shell(
|
|
"wl curpower | grep 'Last est. power'", ignore_status=True)
|
|
if "Last est. power" in last_est_out:
|
|
try:
|
|
per_chain_powers = last_est_out.split(
|
|
':')[1].strip().split(' ')
|
|
per_chain_powers = [
|
|
float(power) for power in per_chain_powers
|
|
]
|
|
except:
|
|
per_chain_powers = [0, 0]
|
|
self.log.warning(
|
|
'Could not parse output: {}'.format(last_est_out))
|
|
self.log.info(
|
|
'Current Tx Powers = {}'.format(per_chain_powers))
|
|
if per_chain_powers[0] > 0:
|
|
chain_0_power.append(per_chain_powers[0])
|
|
if per_chain_powers[1] > 0:
|
|
chain_1_power.append(per_chain_powers[1])
|
|
time.sleep(0.25)
|
|
# Check if empty
|
|
if len(chain_0_power) == 0 or len(chain_1_power) == 0:
|
|
test_result['tx_powers'] = [0, 0]
|
|
tx_power_frequency = [100, 100]
|
|
else:
|
|
test_result['tx_powers'] = [
|
|
scipy.stats.mode(chain_0_power).mode[0],
|
|
scipy.stats.mode(chain_1_power).mode[0]
|
|
]
|
|
tx_power_frequency = [
|
|
100 * scipy.stats.mode(chain_0_power).count[0] /
|
|
len(chain_0_power),
|
|
100 * scipy.stats.mode(chain_1_power).count[0] /
|
|
len(chain_0_power)
|
|
]
|
|
self.log.info(
|
|
'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'.
|
|
format(test_result['tx_powers'], tx_power_frequency[0],
|
|
tx_power_frequency[1]))
|
|
llstats_obj.update_stats()
|
|
curr_llstats = llstats_obj.llstats_incremental.copy()
|
|
test_result['llstats'].append(curr_llstats)
|
|
# DUMP wl curpower one
|
|
try:
|
|
wl_curpower = self.dut.adb.shell('wl curpower')
|
|
except:
|
|
time.sleep(0.25)
|
|
wl_curpower = self.dut.adb.shell('wl curpower',
|
|
ignore_status=True)
|
|
current_context = context.get_current_context(
|
|
).get_full_output_path()
|
|
wl_curpower_path = os.path.join(current_context,
|
|
'wl_curpower_output')
|
|
with open(wl_curpower_path, 'w') as file:
|
|
file.write(wl_curpower)
|
|
wl_curpower_dict = self.process_wl_curpower(
|
|
wl_curpower_path, testcase_params)
|
|
wl_curpower_path = os.path.join(current_context,
|
|
'wl_curpower_dict')
|
|
with open(wl_curpower_path, 'w') as file:
|
|
json.dump(wputils.serialize_dict(wl_curpower_dict),
|
|
file,
|
|
indent=4)
|
|
test_result['wl_curpower'] = wl_curpower_dict
|
|
thread_future.result()
|
|
if self.testbed_params['sniffer_enable']:
|
|
self.sniffer.stop_capture()
|
|
return test_result
|
|
|
|
def setup_ap(self, testcase_params):
|
|
"""Sets up the access point in the configuration required by the test.
|
|
|
|
Args:
|
|
testcase_params: dict containing AP and other test params
|
|
"""
|
|
band = self.access_point.band_lookup_by_channel(
|
|
testcase_params['channel'])
|
|
if '6G' in band:
|
|
frequency = wutils.WifiEnums.channel_6G_to_freq[int(
|
|
testcase_params['channel'].strip('6g'))]
|
|
else:
|
|
if testcase_params['channel'] < 13:
|
|
frequency = wutils.WifiEnums.channel_2G_to_freq[
|
|
testcase_params['channel']]
|
|
else:
|
|
frequency = wutils.WifiEnums.channel_5G_to_freq[
|
|
testcase_params['channel']]
|
|
if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
|
|
self.access_point.set_region(self.testbed_params['DFS_region'])
|
|
else:
|
|
self.access_point.set_region(self.testbed_params['default_region'])
|
|
self.access_point.set_channel_and_bandwidth(band,
|
|
testcase_params['channel'],
|
|
testcase_params['mode'])
|
|
#self.access_point.set_channel(band, testcase_params['channel'])
|
|
#self.access_point.set_bandwidth(band, testcase_params['mode'])
|
|
if 'low' in testcase_params['ap_power']:
|
|
self.log.info('Setting low AP power.')
|
|
self.access_point.set_power(
|
|
band, self.testclass_params['low_ap_tx_power'])
|
|
self.log.info('Access Point Configuration: {}'.format(
|
|
self.access_point.ap_settings))
|
|
|
|
def setup_dut(self, testcase_params):
|
|
"""Sets up the DUT in the configuration required by the test.
|
|
|
|
Args:
|
|
testcase_params: dict containing AP and other test params
|
|
"""
|
|
# Turn screen off to preserve battery
|
|
if self.testbed_params.get('screen_on',
|
|
False) or self.testclass_params.get(
|
|
'screen_on', False):
|
|
self.dut.droid.wakeLockAcquireDim()
|
|
else:
|
|
self.dut.go_to_sleep()
|
|
if wputils.validate_network(self.dut,
|
|
testcase_params['test_network']['SSID']):
|
|
current_country = self.dut.adb.shell('wl country')
|
|
self.log.info('Current country code: {}'.format(current_country))
|
|
if testcase_params['country_code'] in current_country:
|
|
self.log.info('Already connected to desired network')
|
|
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses(
|
|
'wlan0')[0]
|
|
return
|
|
testcase_params['test_network']['channel'] = testcase_params['channel']
|
|
wutils.wifi_toggle_state(self.dut, False)
|
|
wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
|
|
wutils.wifi_toggle_state(self.dut, True)
|
|
wutils.reset_wifi(self.dut)
|
|
if self.testbed_params.get('txbf_off', False):
|
|
wputils.disable_beamforming(self.dut)
|
|
wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
|
|
current_country = self.dut.adb.shell('wl country')
|
|
self.log.info('Current country code: {}'.format(current_country))
|
|
if testcase_params['country_code'] not in current_country:
|
|
asserts.fail('Country code not correct.')
|
|
chan_list = self.dut.adb.shell('wl chan_info_list')
|
|
if str(testcase_params['channel']) not in chan_list:
|
|
asserts.skip('Channel {} not supported in {}'.format(
|
|
testcase_params['channel'], testcase_params['country_code']))
|
|
wutils.wifi_connect(self.dut,
|
|
testcase_params['test_network'],
|
|
num_of_tries=5,
|
|
check_connectivity=True)
|
|
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
|
|
|
|
def setup_tx_power_test(self, testcase_params):
|
|
"""Function that gets devices ready for the test.
|
|
|
|
Args:
|
|
testcase_params: dict containing test-specific parameters
|
|
"""
|
|
# Configure AP
|
|
self.setup_ap(testcase_params)
|
|
# Set attenuator to 0 dB
|
|
for attenuator in self.attenuators:
|
|
attenuator.set_atten(0, strict=False, retry=True)
|
|
# Reset, configure, and connect DUT
|
|
self.setup_dut(testcase_params)
|
|
|
|
def check_skip_conditions(self, testcase_params):
|
|
"""Checks if test should be skipped."""
|
|
# Check battery level before test
|
|
if not wputils.health_check(self.dut, 10):
|
|
asserts.skip('DUT battery level too low.')
|
|
if testcase_params[
|
|
'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported(
|
|
):
|
|
asserts.skip('DUT does not support 6 GHz band.')
|
|
if not self.access_point.band_lookup_by_channel(
|
|
testcase_params['channel']):
|
|
asserts.skip('AP does not support requested channel.')
|
|
|
|
def compile_test_params(self, testcase_params):
|
|
"""Function to compile all testcase parameters."""
|
|
|
|
self.check_skip_conditions(testcase_params)
|
|
|
|
band = self.access_point.band_lookup_by_channel(
|
|
testcase_params['channel'])
|
|
testcase_params['test_network'] = self.main_network[band]
|
|
testcase_params['attenuated_chain'] = -1
|
|
testcase_params.update(
|
|
ping_interval=self.testclass_params['ping_interval'],
|
|
ping_duration=self.testclass_params['ping_duration'],
|
|
ping_size=self.testclass_params['ping_size'],
|
|
)
|
|
|
|
testcase_params['atten_range'] = [0]
|
|
return testcase_params
|
|
|
|
def _test_ping(self, testcase_params):
|
|
""" Function that gets called for each range test case
|
|
|
|
The function gets called in each range test case. It customizes the
|
|
range test based on the test name of the test that called it
|
|
|
|
Args:
|
|
testcase_params: dict containing preliminary set of parameters
|
|
"""
|
|
# Compile test parameters from config and test name
|
|
testcase_params = self.compile_test_params(testcase_params)
|
|
# Run ping test
|
|
self.setup_tx_power_test(testcase_params)
|
|
result = self.run_tx_power_test(testcase_params)
|
|
self.pass_fail_check(result)
|
|
|
|
def generate_test_cases(self, ap_power, channels, modes, test_types,
|
|
country_codes, sar_states):
|
|
"""Function that auto-generates test cases for a test class."""
|
|
test_cases = []
|
|
allowed_configs = {
|
|
20: [
|
|
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100,
|
|
116, 132, 140, 149, 153, 157, 161
|
|
],
|
|
40: [36, 44, 100, 149, 157],
|
|
80: [36, 100, 149],
|
|
160: [36, '6g37', '6g117', '6g213']
|
|
}
|
|
|
|
for channel, mode, test_type, country_code, sar_state in itertools.product(
|
|
channels, modes, test_types, country_codes, sar_states):
|
|
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
|
|
if channel not in allowed_configs[bandwidth]:
|
|
continue
|
|
testcase_name = '{}_ch{}_{}_{}_sar_{}'.format(
|
|
test_type, channel, mode, country_code, sar_state)
|
|
testcase_params = collections.OrderedDict(
|
|
test_type=test_type,
|
|
ap_power=ap_power,
|
|
channel=channel,
|
|
mode=mode,
|
|
bandwidth=bandwidth,
|
|
country_code=country_code,
|
|
sar_state=sar_state)
|
|
setattr(self, testcase_name,
|
|
partial(self._test_ping, testcase_params))
|
|
test_cases.append(testcase_name)
|
|
return test_cases
|
|
|
|
|
|
class WifiTxPowerCheck_BasicSAR_Test(WifiTxPowerCheckTest):
|
|
|
|
def __init__(self, controllers):
|
|
base_test.BaseTestClass.__init__(self, controllers)
|
|
self.testcase_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_case())
|
|
self.testclass_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_class())
|
|
self.publish_testcase_metrics = True
|
|
self.tests = self.generate_test_cases(
|
|
ap_power='standard',
|
|
channels=[6, 36, 52, 100, 149, '6g37'],
|
|
modes=['bw20', 'bw160'],
|
|
test_types=[
|
|
'test_tx_power',
|
|
],
|
|
country_codes=['US', 'GB', 'JP', 'CA'],
|
|
sar_states=[-1, 0, 1, 2, 3, 4])
|