1004 lines
45 KiB
Python
1004 lines
45 KiB
Python
#!/usr/bin/env python3.4
|
|
#
|
|
# Copyright 2017 - The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import collections
|
|
import csv
|
|
import itertools
|
|
import logging
|
|
import numpy
|
|
import os
|
|
from acts import asserts
|
|
from acts import context
|
|
from acts import base_test
|
|
from acts import utils
|
|
from acts.controllers import iperf_client
|
|
from acts.controllers.utils_lib import ssh
|
|
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
|
|
from acts_contrib.test_utils.wifi import ota_chamber
|
|
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
|
|
from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
|
|
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
|
|
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
|
|
from acts_contrib.test_utils.wifi import ota_sniffer
|
|
from functools import partial
|
|
from WifiRvrTest import WifiRvrTest
|
|
from WifiPingTest import WifiPingTest
|
|
|
|
|
|
class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
|
|
"""Class to test WiFi sensitivity tests.
|
|
|
|
This class implements measures WiFi sensitivity per rate. It heavily
|
|
leverages the WifiRvrTest class and introduced minor differences to set
|
|
specific rates and the access point, and implements a different pass/fail
|
|
check. For an example config file to run this test class see
|
|
example_connectivity_performance_ap_sta.json.
|
|
"""
|
|
|
|
MAX_CONSECUTIVE_ZEROS = 5
|
|
RSSI_POLL_INTERVAL = 0.2
|
|
VALID_TEST_CONFIGS = {
|
|
1: ['legacy', 'VHT20'],
|
|
2: ['legacy', 'VHT20'],
|
|
6: ['legacy', 'VHT20'],
|
|
10: ['legacy', 'VHT20'],
|
|
11: ['legacy', 'VHT20'],
|
|
36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
|
|
40: ['legacy', 'VHT20'],
|
|
44: ['legacy', 'VHT20'],
|
|
48: ['legacy', 'VHT20'],
|
|
149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
|
|
153: ['legacy', 'VHT20'],
|
|
157: ['legacy', 'VHT20'],
|
|
161: ['legacy', 'VHT20']
|
|
}
|
|
RateTuple = collections.namedtuple(('RateTuple'),
|
|
['mcs', 'streams', 'data_rate'])
|
|
#yapf:disable
|
|
VALID_RATES = {
|
|
'legacy_2GHz': [
|
|
RateTuple(54, 1, 54), RateTuple(48, 1, 48),
|
|
RateTuple(36, 1, 36), RateTuple(24, 1, 24),
|
|
RateTuple(18, 1, 18), RateTuple(12, 1, 12),
|
|
RateTuple(11, 1, 11), RateTuple(9, 1, 9),
|
|
RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
|
|
RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
|
|
'legacy_5GHz': [
|
|
RateTuple(54, 1, 54), RateTuple(48, 1, 48),
|
|
RateTuple(36, 1, 36), RateTuple(24, 1, 24),
|
|
RateTuple(18, 1, 18), RateTuple(12, 1, 12),
|
|
RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
|
|
'HT20': [
|
|
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
|
|
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
|
|
RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
|
|
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
|
|
RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
|
|
RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
|
|
RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
|
|
RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
|
|
'VHT20': [
|
|
RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
|
|
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
|
|
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
|
|
RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
|
|
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
|
|
RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
|
|
RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
|
|
RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
|
|
RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
|
|
RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
|
|
'VHT40': [
|
|
RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
|
|
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
|
|
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
|
|
RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
|
|
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
|
|
RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
|
|
RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
|
|
RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
|
|
RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
|
|
RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
|
|
'VHT80': [
|
|
RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
|
|
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
|
|
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
|
|
RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
|
|
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
|
|
RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
|
|
RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
|
|
RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
|
|
RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
|
|
RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
|
|
}
|
|
#yapf:enable
|
|
|
|
def __init__(self, controllers):
|
|
base_test.BaseTestClass.__init__(self, controllers)
|
|
self.testcase_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_case())
|
|
self.testclass_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_class())
|
|
self.publish_testcase_metrics = True
|
|
|
|
def setup_class(self):
|
|
"""Initializes common test hardware and parameters.
|
|
|
|
This function initializes hardwares and compiles parameters that are
|
|
common to all tests in this class.
|
|
"""
|
|
self.dut = self.android_devices[-1]
|
|
self.sta_dut = self.android_devices[-1]
|
|
req_params = [
|
|
'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
|
|
'RemoteServer'
|
|
]
|
|
opt_params = ['main_network', 'OTASniffer']
|
|
self.unpack_userparams(req_params, opt_params)
|
|
self.testclass_params = self.sensitivity_test_params
|
|
self.num_atten = self.attenuators[0].instrument.num_atten
|
|
self.ping_server = ssh.connection.SshConnection(
|
|
ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
|
|
if hasattr(self,
|
|
'OTASniffer') and self.testbed_params['sniffer_enable']:
|
|
try:
|
|
self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
|
|
except:
|
|
self.log.warning('Could not start sniffer. Disabling sniffs.')
|
|
self.testbed_params['sniffer_enable'] = 0
|
|
self.remote_server = self.ping_server
|
|
self.iperf_server = self.iperf_servers[0]
|
|
self.iperf_client = self.iperf_clients[0]
|
|
self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
|
|
self.log.info('Access Point Configuration: {}'.format(
|
|
self.access_point.ap_settings))
|
|
self.log_path = os.path.join(logging.log_path, 'results')
|
|
os.makedirs(self.log_path, exist_ok=True)
|
|
self.atten_dut_chain_map = {}
|
|
self.testclass_results = []
|
|
|
|
# Turn WiFi ON
|
|
if self.testclass_params.get('airplane_mode', 1):
|
|
self.log.info('Turning on airplane mode.')
|
|
asserts.assert_true(utils.force_airplane_mode(self.dut, True),
|
|
'Can not turn on airplane mode.')
|
|
wutils.wifi_toggle_state(self.dut, True)
|
|
|
|
# Configure test retries
|
|
self.user_params['retry_tests'] = [self.__class__.__name__]
|
|
|
|
def teardown_class(self):
|
|
self.access_point.teardown()
|
|
# Turn WiFi OFF
|
|
for dev in self.android_devices:
|
|
wutils.wifi_toggle_state(dev, False)
|
|
dev.go_to_sleep()
|
|
self.process_testclass_results()
|
|
|
|
def setup_test(self):
|
|
self.retry_flag = False
|
|
|
|
def teardown_test(self):
|
|
self.retry_flag = False
|
|
|
|
def on_retry(self):
|
|
"""Function to control test logic on retried tests.
|
|
|
|
This function is automatically executed on tests that are being
|
|
retried. In this case the function resets wifi, toggles it off and on
|
|
and sets a retry_flag to enable further tweaking the test logic on
|
|
second attempts.
|
|
"""
|
|
self.retry_flag = True
|
|
for dev in self.android_devices:
|
|
wutils.reset_wifi(dev)
|
|
wutils.toggle_wifi_off_and_on(dev)
|
|
|
|
def pass_fail_check(self, result):
|
|
"""Checks sensitivity results and decides on pass/fail.
|
|
|
|
Args:
|
|
result: dict containing attenuation, throughput and other meta
|
|
data
|
|
"""
|
|
result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
|
|
result['peak_throughput_pct'], result['sensitivity']))
|
|
if result['peak_throughput_pct'] < 95:
|
|
asserts.fail('Result unreliable. {}'.format(result_string))
|
|
else:
|
|
asserts.explicit_pass('Test Passed. {}'.format(result_string))
|
|
|
|
def plot_per_curves(self):
|
|
"""Plots PER curves to help debug sensitivity."""
|
|
|
|
plots = collections.OrderedDict()
|
|
id_fields = ['channel', 'mode', 'num_streams']
|
|
for result in self.testclass_results:
|
|
testcase_params = result['testcase_params']
|
|
plot_id = self.extract_test_id(testcase_params, id_fields)
|
|
plot_id = tuple(plot_id.items())
|
|
if plot_id not in plots:
|
|
plots[plot_id] = BokehFigure(
|
|
title='Channel {} {} Nss{}'.format(
|
|
result['testcase_params']['channel'],
|
|
result['testcase_params']['mode'],
|
|
result['testcase_params']['num_streams']),
|
|
x_label='Attenuation (dB)',
|
|
primary_y_label='PER (%)')
|
|
per = [stat['summary']['rx_per'] for stat in result['llstats']]
|
|
if len(per) < len(result['total_attenuation']):
|
|
per.extend([100] *
|
|
(len(result['total_attenuation']) - len(per)))
|
|
plots[plot_id].add_line(result['total_attenuation'], per,
|
|
result['test_name'])
|
|
figure_list = []
|
|
for plot_id, plot in plots.items():
|
|
plot.generate_figure()
|
|
figure_list.append(plot)
|
|
output_file_path = os.path.join(self.log_path, 'results.html')
|
|
BokehFigure.save_figures(figure_list, output_file_path)
|
|
|
|
def process_testclass_results(self):
|
|
"""Saves and plots test results from all executed test cases."""
|
|
# write json output
|
|
self.plot_per_curves()
|
|
testclass_results_dict = collections.OrderedDict()
|
|
id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
|
|
channels_tested = []
|
|
for result in self.testclass_results:
|
|
testcase_params = result['testcase_params']
|
|
test_id = self.extract_test_id(testcase_params, id_fields)
|
|
test_id = tuple(test_id.items())
|
|
if test_id not in testclass_results_dict:
|
|
testclass_results_dict[test_id] = collections.OrderedDict()
|
|
channel = testcase_params['channel']
|
|
if channel not in channels_tested:
|
|
channels_tested.append(channel)
|
|
if result['peak_throughput_pct'] >= 95:
|
|
testclass_results_dict[test_id][channel] = result[
|
|
'sensitivity']
|
|
else:
|
|
testclass_results_dict[test_id][channel] = ''
|
|
|
|
# calculate average metrics
|
|
metrics_dict = collections.OrderedDict()
|
|
id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
|
|
for test_id in testclass_results_dict.keys():
|
|
for channel in testclass_results_dict[test_id].keys():
|
|
metric_tag = collections.OrderedDict(test_id, channel=channel)
|
|
metric_tag = self.extract_test_id(metric_tag, id_fields)
|
|
metric_tag = tuple(metric_tag.items())
|
|
metrics_dict.setdefault(metric_tag, [])
|
|
sensitivity_result = testclass_results_dict[test_id][channel]
|
|
if sensitivity_result != '':
|
|
metrics_dict[metric_tag].append(sensitivity_result)
|
|
for metric_tag_tuple, metric_data in metrics_dict.items():
|
|
metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
|
|
metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
|
|
metric_tag_dict['channel'], metric_tag_dict['mode'],
|
|
metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
|
|
metric_key = '{}.avg_sensitivity'.format(metric_tag)
|
|
metric_value = numpy.mean(metric_data)
|
|
self.testclass_metric_logger.add_metric(metric_key, metric_value)
|
|
|
|
# write csv
|
|
csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
|
|
for channel in channels_tested:
|
|
csv_header.append('Ch. ' + str(channel))
|
|
results_file_path = os.path.join(self.log_path, 'results.csv')
|
|
with open(results_file_path, mode='w') as csv_file:
|
|
writer = csv.DictWriter(csv_file, fieldnames=csv_header)
|
|
writer.writeheader()
|
|
for test_id, test_results in testclass_results_dict.items():
|
|
test_id_dict = dict(test_id)
|
|
if 'legacy' in test_id_dict['mode']:
|
|
rate_list = self.VALID_RATES['legacy_2GHz']
|
|
else:
|
|
rate_list = self.VALID_RATES[test_id_dict['mode']]
|
|
data_rate = next(rate.data_rate for rate in rate_list
|
|
if rate[:-1] == (test_id_dict['rate'],
|
|
test_id_dict['num_streams']))
|
|
row_value = {
|
|
'Mode': test_id_dict['mode'],
|
|
'MCS': test_id_dict['rate'],
|
|
'Streams': test_id_dict['num_streams'],
|
|
'Chain': test_id_dict['chain_mask'],
|
|
'Rate (Mbps)': data_rate,
|
|
}
|
|
for channel in channels_tested:
|
|
row_value['Ch. ' + str(channel)] = test_results.pop(
|
|
channel, ' ')
|
|
writer.writerow(row_value)
|
|
|
|
if not self.testclass_params['traffic_type'].lower() == 'ping':
|
|
WifiRvrTest.process_testclass_results(self)
|
|
|
|
def process_rvr_test_results(self, testcase_params, rvr_result):
|
|
"""Post processes RvR results to compute sensitivity.
|
|
|
|
Takes in the results of the RvR tests and computes the sensitivity of
|
|
the current rate by looking at the point at which throughput drops
|
|
below the percentage specified in the config file. The function then
|
|
calls on its parent class process_test_results to plot the result.
|
|
|
|
Args:
|
|
rvr_result: dict containing attenuation, throughput and other meta
|
|
data
|
|
"""
|
|
rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
|
|
rvr_result['peak_throughput_pct'] = 100
|
|
throughput_check = [
|
|
throughput < rvr_result['peak_throughput'] *
|
|
(self.testclass_params['throughput_pct_at_sensitivity'] / 100)
|
|
for throughput in rvr_result['throughput_receive']
|
|
]
|
|
consistency_check = [
|
|
idx for idx in range(len(throughput_check))
|
|
if all(throughput_check[idx:])
|
|
]
|
|
rvr_result['atten_at_range'] = rvr_result['attenuation'][
|
|
consistency_check[0] - 1]
|
|
rvr_result['range'] = rvr_result['fixed_attenuation'] + (
|
|
rvr_result['atten_at_range'])
|
|
rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
|
|
self.testbed_params['ap_tx_power_offset'][str(
|
|
testcase_params['channel'])] - rvr_result['range'])
|
|
WifiRvrTest.process_test_results(self, rvr_result)
|
|
|
|
def process_ping_test_results(self, testcase_params, ping_result):
|
|
"""Post processes RvR results to compute sensitivity.
|
|
|
|
Takes in the results of the RvR tests and computes the sensitivity of
|
|
the current rate by looking at the point at which throughput drops
|
|
below the percentage specified in the config file. The function then
|
|
calls on its parent class process_test_results to plot the result.
|
|
|
|
Args:
|
|
rvr_result: dict containing attenuation, throughput and other meta
|
|
data
|
|
"""
|
|
WifiPingTest.process_ping_results(self, testcase_params, ping_result)
|
|
ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
|
|
self.testbed_params['ap_tx_power_offset'][str(
|
|
testcase_params['channel'])] - ping_result['range'])
|
|
|
|
def setup_ping_test(self, testcase_params):
|
|
"""Function that gets devices ready for the test.
|
|
|
|
Args:
|
|
testcase_params: dict containing test-specific parameters
|
|
"""
|
|
# Configure AP
|
|
self.setup_ap(testcase_params)
|
|
# Set attenuator to starting attenuation
|
|
for attenuator in self.attenuators:
|
|
attenuator.set_atten(testcase_params['atten_start'],
|
|
strict=False,
|
|
retry=True)
|
|
# Reset, configure, and connect DUT
|
|
self.setup_dut(testcase_params)
|
|
|
|
def setup_ap(self, testcase_params):
|
|
"""Sets up the AP and attenuator to compensate for AP chain imbalance.
|
|
|
|
Args:
|
|
testcase_params: dict containing AP and other test params
|
|
"""
|
|
band = self.access_point.band_lookup_by_channel(
|
|
testcase_params['channel'])
|
|
if '2G' in band:
|
|
frequency = wutils.WifiEnums.channel_2G_to_freq[
|
|
testcase_params['channel']]
|
|
else:
|
|
frequency = wutils.WifiEnums.channel_5G_to_freq[
|
|
testcase_params['channel']]
|
|
if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
|
|
self.access_point.set_region(self.testbed_params['DFS_region'])
|
|
else:
|
|
self.access_point.set_region(self.testbed_params['default_region'])
|
|
self.access_point.set_channel(band, testcase_params['channel'])
|
|
self.access_point.set_bandwidth(band, testcase_params['mode'])
|
|
self.access_point.set_power(band, testcase_params['ap_tx_power'])
|
|
self.access_point.set_rate(band, testcase_params['mode'],
|
|
testcase_params['num_streams'],
|
|
testcase_params['rate'],
|
|
testcase_params['short_gi'])
|
|
# Set attenuator offsets and set attenuators to initial condition
|
|
atten_offsets = self.testbed_params['chain_offset'][str(
|
|
testcase_params['channel'])]
|
|
for atten in self.attenuators:
|
|
if 'AP-Chain-0' in atten.path:
|
|
atten.offset = atten_offsets[0]
|
|
elif 'AP-Chain-1' in atten.path:
|
|
atten.offset = atten_offsets[1]
|
|
else:
|
|
atten.offset = 0
|
|
self.log.info('Access Point Configuration: {}'.format(
|
|
self.access_point.ap_settings))
|
|
|
|
def setup_dut(self, testcase_params):
|
|
"""Sets up the DUT in the configuration required by the test.
|
|
|
|
Args:
|
|
testcase_params: dict containing AP and other test params
|
|
"""
|
|
# Turn screen off to preserve battery
|
|
if self.testbed_params.get('screen_on',
|
|
False) or self.testclass_params.get(
|
|
'screen_on', False):
|
|
self.dut.droid.wakeLockAcquireDim()
|
|
else:
|
|
self.dut.go_to_sleep()
|
|
if wputils.validate_network(self.dut,
|
|
testcase_params['test_network']['SSID']):
|
|
self.log.info('Already connected to desired network')
|
|
else:
|
|
wutils.wifi_toggle_state(self.dut, False)
|
|
wutils.set_wifi_country_code(self.dut,
|
|
self.testclass_params['country_code'])
|
|
wutils.wifi_toggle_state(self.dut, True)
|
|
wutils.reset_wifi(self.dut)
|
|
wutils.set_wifi_country_code(self.dut,
|
|
self.testclass_params['country_code'])
|
|
testcase_params['test_network']['channel'] = testcase_params[
|
|
'channel']
|
|
wutils.wifi_connect(self.dut,
|
|
testcase_params['test_network'],
|
|
num_of_tries=5,
|
|
check_connectivity=False)
|
|
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
|
|
# Activate/attenuate the correct chains
|
|
if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
|
|
self.atten_dut_chain_map[testcase_params[
|
|
'channel']] = wputils.get_current_atten_dut_chain_map(
|
|
self.attenuators, self.dut, self.ping_server)
|
|
self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
|
|
self.atten_dut_chain_map[testcase_params['channel']]))
|
|
for idx, atten in enumerate(self.attenuators):
|
|
if self.atten_dut_chain_map[testcase_params['channel']][
|
|
idx] == testcase_params['attenuated_chain']:
|
|
atten.offset = atten.instrument.max_atten
|
|
|
|
def extract_test_id(self, testcase_params, id_fields):
|
|
test_id = collections.OrderedDict(
|
|
(param, testcase_params[param]) for param in id_fields)
|
|
return test_id
|
|
|
|
def get_start_atten(self, testcase_params):
|
|
"""Gets the starting attenuation for this sensitivity test.
|
|
|
|
The function gets the starting attenuation by checking whether a test
|
|
as the next higher MCS has been executed. If so it sets the starting
|
|
point a configurable number of dBs below the next MCS's sensitivity.
|
|
|
|
Returns:
|
|
start_atten: starting attenuation for current test
|
|
"""
|
|
# If the test is being retried, start from the beginning
|
|
if self.retry_flag:
|
|
self.log.info('Retry flag set. Setting attenuation to minimum.')
|
|
return self.testclass_params['atten_start']
|
|
# Get the current and reference test config. The reference test is the
|
|
# one performed at the current MCS+1
|
|
current_rate = testcase_params['rate']
|
|
ref_test_params = self.extract_test_id(
|
|
testcase_params,
|
|
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
|
|
if 'legacy' in testcase_params['mode']:
|
|
if testcase_params['channel'] <= 13:
|
|
rate_list = self.VALID_RATES['legacy_2GHz']
|
|
else:
|
|
rate_list = self.VALID_RATES['legacy_5GHz']
|
|
ref_index = max(
|
|
0,
|
|
rate_list.index(self.RateTuple(current_rate, 1, current_rate))
|
|
- 1)
|
|
ref_test_params['rate'] = rate_list[ref_index].mcs
|
|
else:
|
|
ref_test_params['rate'] = current_rate + 1
|
|
|
|
# Check if reference test has been run and set attenuation accordingly
|
|
previous_params = [
|
|
self.extract_test_id(
|
|
result['testcase_params'],
|
|
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
|
|
for result in self.testclass_results
|
|
]
|
|
|
|
try:
|
|
ref_index = previous_params.index(ref_test_params)
|
|
start_atten = self.testclass_results[ref_index][
|
|
'atten_at_range'] - (
|
|
self.testclass_params['adjacent_mcs_range_gap'])
|
|
except ValueError:
|
|
self.log.warning(
|
|
'Reference test not found. Starting from {} dB'.format(
|
|
self.testclass_params['atten_start']))
|
|
start_atten = self.testclass_params['atten_start']
|
|
start_atten = max(start_atten, 0)
|
|
return start_atten
|
|
|
|
def compile_test_params(self, testcase_params):
|
|
"""Function that generates test params based on the test name."""
|
|
# Check if test should be skipped.
|
|
wputils.check_skip_conditions(testcase_params, self.dut,
|
|
self.access_point,
|
|
getattr(self, 'ota_chamber', None))
|
|
|
|
band = self.access_point.band_lookup_by_channel(
|
|
testcase_params['channel'])
|
|
testcase_params['band'] = band
|
|
testcase_params['test_network'] = self.main_network[band]
|
|
if testcase_params['chain_mask'] in ['0', '1']:
|
|
testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
|
|
1 if testcase_params['chain_mask'] == '0' else 0)
|
|
else:
|
|
# Set attenuated chain to -1. Do not set to None as this will be
|
|
# compared to RF chain map which may include None
|
|
testcase_params['attenuated_chain'] = -1
|
|
|
|
self.testclass_params[
|
|
'range_ping_loss_threshold'] = 100 - self.testclass_params[
|
|
'throughput_pct_at_sensitivity']
|
|
if self.testclass_params['traffic_type'] == 'UDP':
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
|
|
self.testclass_params['iperf_duration'],
|
|
self.testclass_params['UDP_rates'][testcase_params['mode']])
|
|
elif self.testclass_params['traffic_type'] == 'TCP':
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
|
|
self.testclass_params['iperf_duration'])
|
|
|
|
if self.testclass_params['traffic_type'] != 'ping' and isinstance(
|
|
self.iperf_client, iperf_client.IPerfClientOverAdb):
|
|
testcase_params['iperf_args'] += ' -R'
|
|
testcase_params['use_client_output'] = True
|
|
else:
|
|
testcase_params['use_client_output'] = False
|
|
|
|
return testcase_params
|
|
|
|
def _test_sensitivity(self, testcase_params):
|
|
"""Function that gets called for each test case
|
|
|
|
The function gets called in each rvr test case. The function customizes
|
|
the rvr test based on the test name of the test that called it
|
|
"""
|
|
# Compile test parameters from config and test name
|
|
testcase_params = self.compile_test_params(testcase_params)
|
|
testcase_params.update(self.testclass_params)
|
|
testcase_params['atten_start'] = self.get_start_atten(testcase_params)
|
|
num_atten_steps = int(
|
|
(testcase_params['atten_stop'] - testcase_params['atten_start']) /
|
|
testcase_params['atten_step'])
|
|
testcase_params['atten_range'] = [
|
|
testcase_params['atten_start'] + x * testcase_params['atten_step']
|
|
for x in range(0, num_atten_steps)
|
|
]
|
|
|
|
# Prepare devices and run test
|
|
if testcase_params['traffic_type'].lower() == 'ping':
|
|
self.setup_ping_test(testcase_params)
|
|
result = self.run_ping_test(testcase_params)
|
|
self.process_ping_test_results(testcase_params, result)
|
|
else:
|
|
self.setup_rvr_test(testcase_params)
|
|
result = self.run_rvr_test(testcase_params)
|
|
self.process_rvr_test_results(testcase_params, result)
|
|
|
|
# Post-process results
|
|
self.testclass_results.append(result)
|
|
self.pass_fail_check(result)
|
|
|
|
def generate_test_cases(self, channels, modes, chain_mask):
|
|
"""Function that auto-generates test cases for a test class."""
|
|
test_cases = []
|
|
for channel in channels:
|
|
requested_modes = [
|
|
mode for mode in modes
|
|
if mode in self.VALID_TEST_CONFIGS[channel]
|
|
]
|
|
for mode in requested_modes:
|
|
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
|
|
if 'VHT' in mode:
|
|
rates = self.VALID_RATES[mode]
|
|
elif 'HT' in mode:
|
|
rates = self.VALID_RATES[mode]
|
|
elif 'legacy' in mode and channel < 14:
|
|
rates = self.VALID_RATES['legacy_2GHz']
|
|
elif 'legacy' in mode and channel > 14:
|
|
rates = self.VALID_RATES['legacy_5GHz']
|
|
else:
|
|
raise ValueError('Invalid test mode.')
|
|
for chain, rate in itertools.product(chain_mask, rates):
|
|
testcase_params = collections.OrderedDict(
|
|
channel=channel,
|
|
mode=mode,
|
|
bandwidth=bandwidth,
|
|
rate=rate.mcs,
|
|
num_streams=rate.streams,
|
|
short_gi=1,
|
|
chain_mask=chain)
|
|
if chain in ['0', '1'] and rate[1] == 2:
|
|
# Do not test 2-stream rates in single chain mode
|
|
continue
|
|
if 'legacy' in mode:
|
|
testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
|
|
'_ch{}'.format(
|
|
channel, mode,
|
|
str(rate.mcs).replace('.', 'p'),
|
|
rate.streams, chain))
|
|
else:
|
|
testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
|
|
'_ch{}'.format(
|
|
channel, mode, rate.mcs,
|
|
rate.streams, chain))
|
|
setattr(self, testcase_name,
|
|
partial(self._test_sensitivity, testcase_params))
|
|
test_cases.append(testcase_name)
|
|
return test_cases
|
|
|
|
|
|
class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
super().__init__(controllers)
|
|
self.tests = self.generate_test_cases(
|
|
[6, 36, 40, 44, 48, 149, 153, 157, 161],
|
|
['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
|
|
|
|
|
|
class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
super().__init__(controllers)
|
|
self.tests = self.generate_test_cases([6, 36, 149],
|
|
['VHT20', 'VHT40', 'VHT80'],
|
|
['0', '1', '2x2'])
|
|
|
|
|
|
class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
super().__init__(controllers)
|
|
self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
|
|
['0', '1', '2x2'])
|
|
|
|
|
|
class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
super().__init__(controllers)
|
|
self.tests = self.generate_test_cases(
|
|
[36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
|
|
['0', '1', '2x2'])
|
|
|
|
|
|
class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
super().__init__(controllers)
|
|
self.tests = self.generate_test_cases([36, 40, 44, 48],
|
|
['VHT20', 'VHT40', 'VHT80'],
|
|
['0', '1', '2x2'])
|
|
|
|
|
|
class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
super().__init__(controllers)
|
|
self.tests = self.generate_test_cases([149, 153, 157, 161],
|
|
['VHT20', 'VHT40', 'VHT80'],
|
|
['0', '1', '2x2'])
|
|
|
|
|
|
# Over-the air version of senstivity tests
|
|
class WifiOtaSensitivityTest(WifiSensitivityTest):
|
|
"""Class to test over-the-air senstivity.
|
|
|
|
This class implements measures WiFi sensitivity tests in an OTA chamber.
|
|
It allows setting orientation and other chamber parameters to study
|
|
performance in varying channel conditions
|
|
"""
|
|
|
|
def __init__(self, controllers):
|
|
base_test.BaseTestClass.__init__(self, controllers)
|
|
self.testcase_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_case())
|
|
self.testclass_metric_logger = (
|
|
BlackboxMappedMetricLogger.for_test_class())
|
|
self.publish_testcase_metrics = False
|
|
|
|
def setup_class(self):
|
|
WifiSensitivityTest.setup_class(self)
|
|
self.current_chain_mask = '2x2'
|
|
self.ota_chamber = ota_chamber.create(
|
|
self.user_params['OTAChamber'])[0]
|
|
|
|
def teardown_class(self):
|
|
WifiSensitivityTest.teardown_class(self)
|
|
self.ota_chamber.reset_chamber()
|
|
|
|
def setup_sensitivity_test(self, testcase_params):
|
|
# Setup turntable
|
|
self.ota_chamber.set_orientation(testcase_params['orientation'])
|
|
# Continue test setup
|
|
WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
|
|
|
|
def setup_dut(self, testcase_params):
|
|
"""Sets up the DUT in the configuration required by the test.
|
|
|
|
Args:
|
|
testcase_params: dict containing AP and other test params
|
|
"""
|
|
# Configure the right INI settings
|
|
wputils.set_chain_mask(self.dut, testcase_params['chain_mask'])
|
|
# Turn screen off to preserve battery
|
|
if self.testbed_params.get('screen_on',
|
|
False) or self.testclass_params.get(
|
|
'screen_on', False):
|
|
self.dut.droid.wakeLockAcquireDim()
|
|
else:
|
|
self.dut.go_to_sleep()
|
|
self.validate_and_connect(testcase_params)
|
|
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
|
|
|
|
def process_testclass_results(self):
|
|
"""Saves and plots test results from all executed test cases."""
|
|
self.plot_per_curves()
|
|
testclass_results_dict = collections.OrderedDict()
|
|
id_fields = ['channel', 'mode', 'rate']
|
|
plots = []
|
|
for result in self.testclass_results:
|
|
test_id = self.extract_test_id(result['testcase_params'],
|
|
id_fields)
|
|
test_id = tuple(test_id.items())
|
|
chain_mask = result['testcase_params']['chain_mask']
|
|
num_streams = result['testcase_params']['num_streams']
|
|
line_id = (chain_mask, num_streams)
|
|
if test_id not in testclass_results_dict:
|
|
testclass_results_dict[test_id] = collections.OrderedDict()
|
|
if line_id not in testclass_results_dict[test_id]:
|
|
testclass_results_dict[test_id][line_id] = {
|
|
'orientation': [],
|
|
'sensitivity': []
|
|
}
|
|
orientation = result['testcase_params']['orientation']
|
|
if result['peak_throughput_pct'] >= 95:
|
|
sensitivity = result['sensitivity']
|
|
else:
|
|
sensitivity = float('nan')
|
|
if orientation not in testclass_results_dict[test_id][line_id][
|
|
'orientation']:
|
|
testclass_results_dict[test_id][line_id]['orientation'].append(
|
|
orientation)
|
|
testclass_results_dict[test_id][line_id]['sensitivity'].append(
|
|
sensitivity)
|
|
else:
|
|
testclass_results_dict[test_id][line_id]['sensitivity'][
|
|
-1] = sensitivity
|
|
|
|
for test_id, test_data in testclass_results_dict.items():
|
|
test_id_dict = dict(test_id)
|
|
if 'legacy' in test_id_dict['mode']:
|
|
test_id_str = 'Channel {} - {} {}Mbps'.format(
|
|
test_id_dict['channel'], test_id_dict['mode'],
|
|
test_id_dict['rate'])
|
|
else:
|
|
test_id_str = 'Channel {} - {} MCS{}'.format(
|
|
test_id_dict['channel'], test_id_dict['mode'],
|
|
test_id_dict['rate'])
|
|
curr_plot = BokehFigure(title=str(test_id_str),
|
|
x_label='Orientation (deg)',
|
|
primary_y_label='Sensitivity (dBm)')
|
|
for line_id, line_results in test_data.items():
|
|
curr_plot.add_line(line_results['orientation'],
|
|
line_results['sensitivity'],
|
|
legend='Nss{} - Chain Mask {}'.format(
|
|
line_id[1], line_id[0]),
|
|
marker='circle')
|
|
if 'legacy' in test_id_dict['mode']:
|
|
metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
|
|
test_id_dict['channel'], test_id_dict['mode'],
|
|
test_id_dict['rate'], line_id[0])
|
|
else:
|
|
metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
|
|
test_id_dict['channel'], test_id_dict['mode'],
|
|
test_id_dict['rate'], line_id[1], line_id[0])
|
|
|
|
metric_name = metric_tag + '.avg_sensitivity'
|
|
metric_value = numpy.nanmean(line_results['sensitivity'])
|
|
self.testclass_metric_logger.add_metric(
|
|
metric_name, metric_value)
|
|
self.log.info(('Average Sensitivity for {}: {:.1f}').format(
|
|
metric_tag, metric_value))
|
|
current_context = (
|
|
context.get_current_context().get_full_output_path())
|
|
output_file_path = os.path.join(current_context,
|
|
str(test_id_str) + '.html')
|
|
curr_plot.generate_figure(output_file_path)
|
|
plots.append(curr_plot)
|
|
output_file_path = os.path.join(current_context, 'results.html')
|
|
BokehFigure.save_figures(plots, output_file_path)
|
|
|
|
def get_start_atten(self, testcase_params):
|
|
"""Gets the starting attenuation for this sensitivity test.
|
|
|
|
The function gets the starting attenuation by checking whether a test
|
|
at the same rate configuration has executed. If so it sets the starting
|
|
point a configurable number of dBs below the reference test.
|
|
|
|
Returns:
|
|
start_atten: starting attenuation for current test
|
|
"""
|
|
# If the test is being retried, start from the beginning
|
|
if self.retry_flag:
|
|
self.log.info('Retry flag set. Setting attenuation to minimum.')
|
|
return self.testclass_params['atten_start']
|
|
# Get the current and reference test config. The reference test is the
|
|
# one performed at the current MCS+1
|
|
ref_test_params = self.extract_test_id(
|
|
testcase_params,
|
|
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
|
|
# Check if reference test has been run and set attenuation accordingly
|
|
previous_params = [
|
|
self.extract_test_id(
|
|
result['testcase_params'],
|
|
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
|
|
for result in self.testclass_results
|
|
]
|
|
try:
|
|
ref_index = previous_params[::-1].index(ref_test_params)
|
|
ref_index = len(previous_params) - 1 - ref_index
|
|
start_atten = self.testclass_results[ref_index][
|
|
'atten_at_range'] - (
|
|
self.testclass_params['adjacent_mcs_range_gap'])
|
|
except ValueError:
|
|
print('Reference test not found. Starting from {} dB'.format(
|
|
self.testclass_params['atten_start']))
|
|
start_atten = self.testclass_params['atten_start']
|
|
start_atten = max(start_atten, 0)
|
|
return start_atten
|
|
|
|
def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
|
|
angles):
|
|
"""Function that auto-generates test cases for a test class."""
|
|
test_cases = []
|
|
for channel in channels:
|
|
requested_modes = [
|
|
mode for mode in modes
|
|
if mode in self.VALID_TEST_CONFIGS[channel]
|
|
]
|
|
for chain, mode in itertools.product(chain_mask, requested_modes):
|
|
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
|
|
if 'VHT' in mode:
|
|
valid_rates = self.VALID_RATES[mode]
|
|
elif 'HT' in mode:
|
|
valid_rates = self.VALID_RATES[mode]
|
|
elif 'legacy' in mode and channel < 14:
|
|
valid_rates = self.VALID_RATES['legacy_2GHz']
|
|
elif 'legacy' in mode and channel > 14:
|
|
valid_rates = self.VALID_RATES['legacy_5GHz']
|
|
else:
|
|
raise ValueError('Invalid test mode.')
|
|
for rate, angle in itertools.product(valid_rates, angles):
|
|
testcase_params = collections.OrderedDict(
|
|
channel=channel,
|
|
mode=mode,
|
|
bandwidth=bandwidth,
|
|
rate=rate.mcs,
|
|
num_streams=rate.streams,
|
|
short_gi=1,
|
|
chain_mask=chain,
|
|
orientation=angle)
|
|
if rate not in requested_rates:
|
|
continue
|
|
if str(chain) in ['0', '1'] and rate[1] == 2:
|
|
# Do not test 2-stream rates in single chain mode
|
|
continue
|
|
if 'legacy' in mode:
|
|
testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
|
|
'_ch{}_{}deg'.format(
|
|
channel, mode,
|
|
str(rate.mcs).replace('.', 'p'),
|
|
rate.streams, chain, angle))
|
|
else:
|
|
testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
|
|
'_ch{}_{}deg'.format(
|
|
channel, mode, rate.mcs,
|
|
rate.streams, chain, angle))
|
|
setattr(self, testcase_name,
|
|
partial(self._test_sensitivity, testcase_params))
|
|
test_cases.append(testcase_name)
|
|
return test_cases
|
|
|
|
|
|
class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
WifiOtaSensitivityTest.__init__(self, controllers)
|
|
requested_channels = [6, 36, 149]
|
|
requested_rates = [
|
|
self.RateTuple(8, 1, 86.7),
|
|
self.RateTuple(6, 1, 65),
|
|
self.RateTuple(2, 1, 21.7),
|
|
self.RateTuple(8, 2, 173.3),
|
|
self.RateTuple(6, 2, 130.3),
|
|
self.RateTuple(2, 2, 43.3)
|
|
]
|
|
self.tests = self.generate_test_cases(requested_channels,
|
|
['VHT20', 'VHT80'],
|
|
requested_rates, ['2x2'],
|
|
list(range(0, 360, 10)))
|
|
|
|
|
|
class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
WifiOtaSensitivityTest.__init__(self, controllers)
|
|
requested_channels = [6, 36, 149]
|
|
requested_rates = [
|
|
self.RateTuple(9, 1, 96),
|
|
self.RateTuple(9, 2, 192),
|
|
self.RateTuple(6, 1, 65),
|
|
self.RateTuple(6, 2, 130.3),
|
|
self.RateTuple(2, 1, 21.7),
|
|
self.RateTuple(2, 2, 43.3)
|
|
]
|
|
self.tests = self.generate_test_cases(requested_channels,
|
|
['VHT20', 'VHT80'],
|
|
requested_rates, [0, 1, '2x2'],
|
|
list(range(0, 360, 10)))
|
|
|
|
|
|
class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
WifiOtaSensitivityTest.__init__(self, controllers)
|
|
requested_channels = [6, 36, 149]
|
|
requested_rates = [
|
|
self.RateTuple(9, 1, 96),
|
|
self.RateTuple(8, 1, 86.7),
|
|
self.RateTuple(7, 1, 72.2),
|
|
self.RateTuple(4, 1, 43.3),
|
|
self.RateTuple(2, 1, 21.7),
|
|
self.RateTuple(0, 1, 7.2),
|
|
self.RateTuple(9, 2, 192),
|
|
self.RateTuple(8, 2, 173.3),
|
|
self.RateTuple(7, 2, 144.4),
|
|
self.RateTuple(4, 2, 86.7),
|
|
self.RateTuple(2, 2, 43.3),
|
|
self.RateTuple(0, 2, 14.4)
|
|
]
|
|
self.tests = self.generate_test_cases(requested_channels,
|
|
['VHT20', 'VHT80'],
|
|
requested_rates, ['2x2'],
|
|
list(range(0, 360, 30)))
|
|
|
|
|
|
class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
|
|
|
|
def __init__(self, controllers):
|
|
WifiOtaSensitivityTest.__init__(self, controllers)
|
|
requested_rates = [
|
|
self.RateTuple(8, 1, 86.7),
|
|
self.RateTuple(2, 1, 21.7),
|
|
self.RateTuple(8, 2, 173.3),
|
|
self.RateTuple(2, 2, 43.3)
|
|
]
|
|
self.tests = self.generate_test_cases(
|
|
[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
|
|
requested_rates, ['2x2'], list(range(0, 360, 45)))
|