1143 lines
50 KiB
Python
1143 lines
50 KiB
Python
|
|
#!/usr/bin/env python3.4
|
||
|
|
#
|
||
|
|
# Copyright 2018 - The Android Open Source Project
|
||
|
|
#
|
||
|
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||
|
|
# you may not use this file except in compliance with the License.
|
||
|
|
# You may obtain a copy of the License at
|
||
|
|
#
|
||
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
#
|
||
|
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
|
# See the License for the specific language governing permissions and
|
||
|
|
# limitations under the License.
|
||
|
|
|
||
|
|
import collections
|
||
|
|
import itertools
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import math
|
||
|
|
import numpy
|
||
|
|
import os
|
||
|
|
import statistics
|
||
|
|
from acts import asserts
|
||
|
|
from acts import base_test
|
||
|
|
from acts import context
|
||
|
|
from acts import utils
|
||
|
|
from acts.controllers.utils_lib import ssh
|
||
|
|
from acts.controllers import iperf_server as ipf
|
||
|
|
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
|
||
|
|
from acts_contrib.test_utils.wifi import ota_chamber
|
||
|
|
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
|
||
|
|
from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
|
||
|
|
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
|
||
|
|
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
|
||
|
|
from concurrent.futures import ThreadPoolExecutor
|
||
|
|
from functools import partial
|
||
|
|
|
||
|
|
SHORT_SLEEP = 1
|
||
|
|
MED_SLEEP = 6
|
||
|
|
CONST_3dB = 3.01029995664
|
||
|
|
RSSI_ERROR_VAL = float('nan')
|
||
|
|
|
||
|
|
|
||
|
|
class WifiRssiTest(base_test.BaseTestClass):
|
||
|
|
"""Class to test WiFi RSSI reporting.
|
||
|
|
|
||
|
|
This class tests RSSI reporting on android devices. The class tests RSSI
|
||
|
|
accuracy by checking RSSI over a large attenuation range, checks for RSSI
|
||
|
|
stability over time when attenuation is fixed, and checks that RSSI quickly
|
||
|
|
and reacts to changes attenuation by checking RSSI trajectories over
|
||
|
|
configurable attenuation waveforms.For an example config file to run this
|
||
|
|
test class see example_connectivity_performance_ap_sta.json.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
base_test.BaseTestClass.__init__(self, controllers)
|
||
|
|
self.testcase_metric_logger = (
|
||
|
|
BlackboxMappedMetricLogger.for_test_case())
|
||
|
|
self.testclass_metric_logger = (
|
||
|
|
BlackboxMappedMetricLogger.for_test_class())
|
||
|
|
self.publish_test_metrics = True
|
||
|
|
|
||
|
|
def setup_class(self):
|
||
|
|
self.dut = self.android_devices[0]
|
||
|
|
req_params = [
|
||
|
|
'RemoteServer', 'RetailAccessPoints', 'rssi_test_params',
|
||
|
|
'main_network', 'testbed_params'
|
||
|
|
]
|
||
|
|
self.unpack_userparams(req_params)
|
||
|
|
self.testclass_params = self.rssi_test_params
|
||
|
|
self.num_atten = self.attenuators[0].instrument.num_atten
|
||
|
|
self.iperf_server = self.iperf_servers[0]
|
||
|
|
self.iperf_client = self.iperf_clients[0]
|
||
|
|
self.remote_server = ssh.connection.SshConnection(
|
||
|
|
ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
|
||
|
|
self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
|
||
|
|
self.log_path = os.path.join(logging.log_path, 'results')
|
||
|
|
os.makedirs(self.log_path, exist_ok=True)
|
||
|
|
self.log.info('Access Point Configuration: {}'.format(
|
||
|
|
self.access_point.ap_settings))
|
||
|
|
self.testclass_results = []
|
||
|
|
|
||
|
|
# Turn WiFi ON
|
||
|
|
if self.testclass_params.get('airplane_mode', 1):
|
||
|
|
self.log.info('Turning on airplane mode.')
|
||
|
|
asserts.assert_true(utils.force_airplane_mode(self.dut, True),
|
||
|
|
'Can not turn on airplane mode.')
|
||
|
|
wutils.wifi_toggle_state(self.dut, True)
|
||
|
|
|
||
|
|
def teardown_test(self):
|
||
|
|
self.iperf_server.stop()
|
||
|
|
|
||
|
|
def teardown_class(self):
|
||
|
|
# Turn WiFi OFF and reset AP
|
||
|
|
self.access_point.teardown()
|
||
|
|
for dev in self.android_devices:
|
||
|
|
wutils.wifi_toggle_state(dev, False)
|
||
|
|
dev.go_to_sleep()
|
||
|
|
|
||
|
|
def pass_fail_check_rssi_stability(self, testcase_params,
|
||
|
|
postprocessed_results):
|
||
|
|
"""Check the test result and decide if it passed or failed.
|
||
|
|
|
||
|
|
Checks the RSSI test result and fails the test if the standard
|
||
|
|
deviation of signal_poll_rssi is beyond the threshold defined in the
|
||
|
|
config file.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
postprocessed_results: compiled arrays of RSSI measurements
|
||
|
|
"""
|
||
|
|
# Set Blackbox metric values
|
||
|
|
if self.publish_test_metrics:
|
||
|
|
self.testcase_metric_logger.add_metric(
|
||
|
|
'signal_poll_rssi_stdev',
|
||
|
|
max(postprocessed_results['signal_poll_rssi']['stdev']))
|
||
|
|
self.testcase_metric_logger.add_metric(
|
||
|
|
'chain_0_rssi_stdev',
|
||
|
|
max(postprocessed_results['chain_0_rssi']['stdev']))
|
||
|
|
self.testcase_metric_logger.add_metric(
|
||
|
|
'chain_1_rssi_stdev',
|
||
|
|
max(postprocessed_results['chain_1_rssi']['stdev']))
|
||
|
|
|
||
|
|
# Evaluate test pass/fail
|
||
|
|
test_failed = any([
|
||
|
|
stdev > self.testclass_params['stdev_tolerance']
|
||
|
|
for stdev in postprocessed_results['signal_poll_rssi']['stdev']
|
||
|
|
])
|
||
|
|
test_message = (
|
||
|
|
'RSSI stability {0}. Standard deviation was {1} dB '
|
||
|
|
'(limit {2}), per chain standard deviation [{3}, {4}] dB'.format(
|
||
|
|
'failed' * test_failed + 'passed' * (not test_failed), [
|
||
|
|
float('{:.2f}'.format(x))
|
||
|
|
for x in postprocessed_results['signal_poll_rssi']['stdev']
|
||
|
|
], self.testclass_params['stdev_tolerance'], [
|
||
|
|
float('{:.2f}'.format(x))
|
||
|
|
for x in postprocessed_results['chain_0_rssi']['stdev']
|
||
|
|
], [
|
||
|
|
float('{:.2f}'.format(x))
|
||
|
|
for x in postprocessed_results['chain_1_rssi']['stdev']
|
||
|
|
]))
|
||
|
|
if test_failed:
|
||
|
|
asserts.fail(test_message)
|
||
|
|
asserts.explicit_pass(test_message)
|
||
|
|
|
||
|
|
def pass_fail_check_rssi_accuracy(self, testcase_params,
|
||
|
|
postprocessed_results):
|
||
|
|
"""Check the test result and decide if it passed or failed.
|
||
|
|
|
||
|
|
Checks the RSSI test result and compares and compute its deviation from
|
||
|
|
the predicted RSSI. This computation is done for all reported RSSI
|
||
|
|
values. The test fails if any of the RSSI values specified in
|
||
|
|
rssi_under_test have an average error beyond what is specified in the
|
||
|
|
configuration file.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
postprocessed_results: compiled arrays of RSSI measurements
|
||
|
|
testcase_params: dict containing params such as list of RSSIs under
|
||
|
|
test, i.e., can cause test to fail and boolean indicating whether
|
||
|
|
to look at absolute RSSI accuracy, or centered RSSI accuracy.
|
||
|
|
Centered accuracy is computed after systematic RSSI shifts are
|
||
|
|
removed.
|
||
|
|
"""
|
||
|
|
test_failed = False
|
||
|
|
test_message = ''
|
||
|
|
if testcase_params['absolute_accuracy']:
|
||
|
|
error_type = 'absolute'
|
||
|
|
else:
|
||
|
|
error_type = 'centered'
|
||
|
|
|
||
|
|
for key, val in postprocessed_results.items():
|
||
|
|
# Compute the error metrics ignoring invalid RSSI readings
|
||
|
|
# If all readings invalid, set error to RSSI_ERROR_VAL
|
||
|
|
if 'rssi' in key and 'predicted' not in key:
|
||
|
|
filtered_error = [x for x in val['error'] if not math.isnan(x)]
|
||
|
|
if filtered_error:
|
||
|
|
avg_shift = statistics.mean(filtered_error)
|
||
|
|
if testcase_params['absolute_accuracy']:
|
||
|
|
avg_error = statistics.mean(
|
||
|
|
[abs(x) for x in filtered_error])
|
||
|
|
else:
|
||
|
|
avg_error = statistics.mean(
|
||
|
|
[abs(x - avg_shift) for x in filtered_error])
|
||
|
|
else:
|
||
|
|
avg_error = RSSI_ERROR_VAL
|
||
|
|
avg_shift = RSSI_ERROR_VAL
|
||
|
|
# Set Blackbox metric values
|
||
|
|
if self.publish_test_metrics:
|
||
|
|
self.testcase_metric_logger.add_metric(
|
||
|
|
'{}_error'.format(key), avg_error)
|
||
|
|
self.testcase_metric_logger.add_metric(
|
||
|
|
'{}_shift'.format(key), avg_shift)
|
||
|
|
# Evaluate test pass/fail
|
||
|
|
rssi_failure = (avg_error >
|
||
|
|
self.testclass_params['abs_tolerance']
|
||
|
|
) or math.isnan(avg_error)
|
||
|
|
if rssi_failure and key in testcase_params['rssi_under_test']:
|
||
|
|
test_message = test_message + (
|
||
|
|
'{} failed ({} error = {:.2f} dB, '
|
||
|
|
'shift = {:.2f} dB)\n').format(key, error_type,
|
||
|
|
avg_error, avg_shift)
|
||
|
|
test_failed = True
|
||
|
|
elif rssi_failure:
|
||
|
|
test_message = test_message + (
|
||
|
|
'{} failed (ignored) ({} error = {:.2f} dB, '
|
||
|
|
'shift = {:.2f} dB)\n').format(key, error_type,
|
||
|
|
avg_error, avg_shift)
|
||
|
|
else:
|
||
|
|
test_message = test_message + (
|
||
|
|
'{} passed ({} error = {:.2f} dB, '
|
||
|
|
'shift = {:.2f} dB)\n').format(key, error_type,
|
||
|
|
avg_error, avg_shift)
|
||
|
|
if test_failed:
|
||
|
|
asserts.fail(test_message)
|
||
|
|
asserts.explicit_pass(test_message)
|
||
|
|
|
||
|
|
def post_process_rssi_sweep(self, rssi_result):
|
||
|
|
"""Postprocesses and saves JSON formatted results.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
rssi_result: dict containing attenuation, rssi and other meta
|
||
|
|
data
|
||
|
|
Returns:
|
||
|
|
postprocessed_results: compiled arrays of RSSI data used in
|
||
|
|
pass/fail check
|
||
|
|
"""
|
||
|
|
# Save output as text file
|
||
|
|
results_file_path = os.path.join(self.log_path, self.current_test_name)
|
||
|
|
with open(results_file_path, 'w') as results_file:
|
||
|
|
json.dump(wputils.serialize_dict(rssi_result),
|
||
|
|
results_file,
|
||
|
|
indent=4)
|
||
|
|
# Compile results into arrays of RSSIs suitable for plotting
|
||
|
|
# yapf: disable
|
||
|
|
postprocessed_results = collections.OrderedDict(
|
||
|
|
[('signal_poll_rssi', {}),
|
||
|
|
('signal_poll_avg_rssi', {}),
|
||
|
|
('scan_rssi', {}),
|
||
|
|
('chain_0_rssi', {}),
|
||
|
|
('chain_1_rssi', {}),
|
||
|
|
('total_attenuation', []),
|
||
|
|
('predicted_rssi', [])])
|
||
|
|
# yapf: enable
|
||
|
|
for key, val in postprocessed_results.items():
|
||
|
|
if 'scan_rssi' in key:
|
||
|
|
postprocessed_results[key]['data'] = [
|
||
|
|
x for data_point in rssi_result['rssi_result'] for x in
|
||
|
|
data_point[key][rssi_result['connected_bssid']]['data']
|
||
|
|
]
|
||
|
|
postprocessed_results[key]['mean'] = [
|
||
|
|
x[key][rssi_result['connected_bssid']]['mean']
|
||
|
|
for x in rssi_result['rssi_result']
|
||
|
|
]
|
||
|
|
postprocessed_results[key]['stdev'] = [
|
||
|
|
x[key][rssi_result['connected_bssid']]['stdev']
|
||
|
|
for x in rssi_result['rssi_result']
|
||
|
|
]
|
||
|
|
elif 'predicted_rssi' in key:
|
||
|
|
postprocessed_results['total_attenuation'] = [
|
||
|
|
att + rssi_result['fixed_attenuation'] +
|
||
|
|
rssi_result['dut_front_end_loss']
|
||
|
|
for att in rssi_result['attenuation']
|
||
|
|
]
|
||
|
|
postprocessed_results['predicted_rssi'] = [
|
||
|
|
rssi_result['ap_tx_power'] - att
|
||
|
|
for att in postprocessed_results['total_attenuation']
|
||
|
|
]
|
||
|
|
elif 'rssi' in key:
|
||
|
|
postprocessed_results[key]['data'] = [
|
||
|
|
x for data_point in rssi_result['rssi_result']
|
||
|
|
for x in data_point[key]['data']
|
||
|
|
]
|
||
|
|
postprocessed_results[key]['mean'] = [
|
||
|
|
x[key]['mean'] for x in rssi_result['rssi_result']
|
||
|
|
]
|
||
|
|
postprocessed_results[key]['stdev'] = [
|
||
|
|
x[key]['stdev'] for x in rssi_result['rssi_result']
|
||
|
|
]
|
||
|
|
# Compute RSSI errors
|
||
|
|
for key, val in postprocessed_results.items():
|
||
|
|
if 'chain' in key:
|
||
|
|
postprocessed_results[key]['error'] = [
|
||
|
|
postprocessed_results[key]['mean'][idx] + CONST_3dB -
|
||
|
|
postprocessed_results['predicted_rssi'][idx]
|
||
|
|
for idx in range(
|
||
|
|
len(postprocessed_results['predicted_rssi']))
|
||
|
|
]
|
||
|
|
elif 'rssi' in key and 'predicted' not in key:
|
||
|
|
postprocessed_results[key]['error'] = [
|
||
|
|
postprocessed_results[key]['mean'][idx] -
|
||
|
|
postprocessed_results['predicted_rssi'][idx]
|
||
|
|
for idx in range(
|
||
|
|
len(postprocessed_results['predicted_rssi']))
|
||
|
|
]
|
||
|
|
return postprocessed_results
|
||
|
|
|
||
|
|
def plot_rssi_vs_attenuation(self, postprocessed_results):
|
||
|
|
"""Function to plot RSSI vs attenuation sweeps
|
||
|
|
|
||
|
|
Args:
|
||
|
|
postprocessed_results: compiled arrays of RSSI data.
|
||
|
|
"""
|
||
|
|
figure = BokehFigure(self.current_test_name,
|
||
|
|
x_label='Attenuation (dB)',
|
||
|
|
primary_y_label='RSSI (dBm)')
|
||
|
|
figure.add_line(postprocessed_results['total_attenuation'],
|
||
|
|
postprocessed_results['signal_poll_rssi']['mean'],
|
||
|
|
'Signal Poll RSSI',
|
||
|
|
marker='circle')
|
||
|
|
figure.add_line(postprocessed_results['total_attenuation'],
|
||
|
|
postprocessed_results['scan_rssi']['mean'],
|
||
|
|
'Scan RSSI',
|
||
|
|
marker='circle')
|
||
|
|
figure.add_line(postprocessed_results['total_attenuation'],
|
||
|
|
postprocessed_results['chain_0_rssi']['mean'],
|
||
|
|
'Chain 0 RSSI',
|
||
|
|
marker='circle')
|
||
|
|
figure.add_line(postprocessed_results['total_attenuation'],
|
||
|
|
postprocessed_results['chain_1_rssi']['mean'],
|
||
|
|
'Chain 1 RSSI',
|
||
|
|
marker='circle')
|
||
|
|
figure.add_line(postprocessed_results['total_attenuation'],
|
||
|
|
postprocessed_results['predicted_rssi'],
|
||
|
|
'Predicted RSSI',
|
||
|
|
marker='circle')
|
||
|
|
|
||
|
|
output_file_path = os.path.join(self.log_path,
|
||
|
|
self.current_test_name + '.html')
|
||
|
|
figure.generate_figure(output_file_path)
|
||
|
|
|
||
|
|
def plot_rssi_vs_time(self, rssi_result, postprocessed_results,
|
||
|
|
center_curves):
|
||
|
|
"""Function to plot RSSI vs time.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
rssi_result: dict containing raw RSSI data
|
||
|
|
postprocessed_results: compiled arrays of RSSI data
|
||
|
|
center_curvers: boolean indicating whether to shift curves to align
|
||
|
|
them with predicted RSSIs
|
||
|
|
"""
|
||
|
|
figure = BokehFigure(
|
||
|
|
self.current_test_name,
|
||
|
|
x_label='Time (s)',
|
||
|
|
primary_y_label=center_curves * 'Centered' + 'RSSI (dBm)',
|
||
|
|
)
|
||
|
|
|
||
|
|
# yapf: disable
|
||
|
|
rssi_time_series = collections.OrderedDict(
|
||
|
|
[('signal_poll_rssi', []),
|
||
|
|
('signal_poll_avg_rssi', []),
|
||
|
|
('scan_rssi', []),
|
||
|
|
('chain_0_rssi', []),
|
||
|
|
('chain_1_rssi', []),
|
||
|
|
('predicted_rssi', [])])
|
||
|
|
# yapf: enable
|
||
|
|
for key, val in rssi_time_series.items():
|
||
|
|
if 'predicted_rssi' in key:
|
||
|
|
rssi_time_series[key] = [
|
||
|
|
x for x in postprocessed_results[key] for copies in range(
|
||
|
|
len(rssi_result['rssi_result'][0]['signal_poll_rssi']
|
||
|
|
['data']))
|
||
|
|
]
|
||
|
|
elif 'rssi' in key:
|
||
|
|
if center_curves:
|
||
|
|
filtered_error = [
|
||
|
|
x for x in postprocessed_results[key]['error']
|
||
|
|
if not math.isnan(x)
|
||
|
|
]
|
||
|
|
if filtered_error:
|
||
|
|
avg_shift = statistics.mean(filtered_error)
|
||
|
|
else:
|
||
|
|
avg_shift = 0
|
||
|
|
rssi_time_series[key] = [
|
||
|
|
x - avg_shift
|
||
|
|
for x in postprocessed_results[key]['data']
|
||
|
|
]
|
||
|
|
else:
|
||
|
|
rssi_time_series[key] = postprocessed_results[key]['data']
|
||
|
|
time_vec = [
|
||
|
|
self.testclass_params['polling_frequency'] * x
|
||
|
|
for x in range(len(rssi_time_series[key]))
|
||
|
|
]
|
||
|
|
if len(rssi_time_series[key]) > 0:
|
||
|
|
figure.add_line(time_vec, rssi_time_series[key], key)
|
||
|
|
|
||
|
|
output_file_path = os.path.join(self.log_path,
|
||
|
|
self.current_test_name + '.html')
|
||
|
|
figure.generate_figure(output_file_path)
|
||
|
|
|
||
|
|
def plot_rssi_distribution(self, postprocessed_results):
|
||
|
|
"""Function to plot RSSI distributions.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
postprocessed_results: compiled arrays of RSSI data
|
||
|
|
"""
|
||
|
|
monitored_rssis = ['signal_poll_rssi', 'chain_0_rssi', 'chain_1_rssi']
|
||
|
|
|
||
|
|
rssi_dist = collections.OrderedDict()
|
||
|
|
for rssi_key in monitored_rssis:
|
||
|
|
rssi_data = postprocessed_results[rssi_key]
|
||
|
|
rssi_dist[rssi_key] = collections.OrderedDict()
|
||
|
|
unique_rssi = sorted(set(rssi_data['data']))
|
||
|
|
rssi_counts = []
|
||
|
|
for value in unique_rssi:
|
||
|
|
rssi_counts.append(rssi_data['data'].count(value))
|
||
|
|
total_count = sum(rssi_counts)
|
||
|
|
rssi_dist[rssi_key]['rssi_values'] = unique_rssi
|
||
|
|
rssi_dist[rssi_key]['rssi_pdf'] = [
|
||
|
|
x / total_count for x in rssi_counts
|
||
|
|
]
|
||
|
|
rssi_dist[rssi_key]['rssi_cdf'] = []
|
||
|
|
cum_prob = 0
|
||
|
|
for prob in rssi_dist[rssi_key]['rssi_pdf']:
|
||
|
|
cum_prob += prob
|
||
|
|
rssi_dist[rssi_key]['rssi_cdf'].append(cum_prob)
|
||
|
|
|
||
|
|
figure = BokehFigure(self.current_test_name,
|
||
|
|
x_label='RSSI (dBm)',
|
||
|
|
primary_y_label='p(RSSI = x)',
|
||
|
|
secondary_y_label='p(RSSI <= x)')
|
||
|
|
for rssi_key, rssi_data in rssi_dist.items():
|
||
|
|
figure.add_line(x_data=rssi_data['rssi_values'],
|
||
|
|
y_data=rssi_data['rssi_pdf'],
|
||
|
|
legend='{} PDF'.format(rssi_key),
|
||
|
|
y_axis='default')
|
||
|
|
figure.add_line(x_data=rssi_data['rssi_values'],
|
||
|
|
y_data=rssi_data['rssi_cdf'],
|
||
|
|
legend='{} CDF'.format(rssi_key),
|
||
|
|
y_axis='secondary')
|
||
|
|
output_file_path = os.path.join(self.log_path,
|
||
|
|
self.current_test_name + '_dist.html')
|
||
|
|
figure.generate_figure(output_file_path)
|
||
|
|
|
||
|
|
def run_rssi_test(self, testcase_params):
|
||
|
|
"""Test function to run RSSI tests.
|
||
|
|
|
||
|
|
The function runs an RSSI test in the current device/AP configuration.
|
||
|
|
Function is called from another wrapper function that sets up the
|
||
|
|
testbed for the RvR test
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
Returns:
|
||
|
|
rssi_result: dict containing rssi_result and meta data
|
||
|
|
"""
|
||
|
|
# Run test and log result
|
||
|
|
rssi_result = collections.OrderedDict()
|
||
|
|
rssi_result['test_name'] = self.current_test_name
|
||
|
|
rssi_result['testcase_params'] = testcase_params
|
||
|
|
rssi_result['ap_settings'] = self.access_point.ap_settings.copy()
|
||
|
|
rssi_result['attenuation'] = list(testcase_params['rssi_atten_range'])
|
||
|
|
rssi_result['connected_bssid'] = self.main_network[
|
||
|
|
testcase_params['band']].get('BSSID', '00:00:00:00')
|
||
|
|
channel_mode_combo = '{}_{}'.format(str(testcase_params['channel']),
|
||
|
|
testcase_params['mode'])
|
||
|
|
channel_str = str(testcase_params['channel'])
|
||
|
|
if channel_mode_combo in self.testbed_params['ap_tx_power']:
|
||
|
|
rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][
|
||
|
|
channel_mode_combo]
|
||
|
|
else:
|
||
|
|
rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][
|
||
|
|
str(testcase_params['channel'])]
|
||
|
|
rssi_result['fixed_attenuation'] = self.testbed_params[
|
||
|
|
'fixed_attenuation'][channel_str]
|
||
|
|
rssi_result['dut_front_end_loss'] = self.testbed_params[
|
||
|
|
'dut_front_end_loss'][channel_str]
|
||
|
|
|
||
|
|
self.log.info('Start running RSSI test.')
|
||
|
|
rssi_result['rssi_result'] = []
|
||
|
|
rssi_result['llstats'] = []
|
||
|
|
llstats_obj = wputils.LinkLayerStats(self.dut)
|
||
|
|
# Start iperf traffic if required by test
|
||
|
|
if testcase_params['active_traffic'] and testcase_params[
|
||
|
|
'traffic_type'] == 'iperf':
|
||
|
|
self.iperf_server.start(tag=0)
|
||
|
|
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
|
||
|
|
iperf_server_address = self.dut_ip
|
||
|
|
else:
|
||
|
|
iperf_server_address = wputils.get_server_address(
|
||
|
|
self.remote_server, self.dut_ip, '255.255.255.0')
|
||
|
|
executor = ThreadPoolExecutor(max_workers=1)
|
||
|
|
thread_future = executor.submit(
|
||
|
|
self.iperf_client.start, iperf_server_address,
|
||
|
|
testcase_params['iperf_args'], 0,
|
||
|
|
testcase_params['traffic_timeout'] + SHORT_SLEEP)
|
||
|
|
executor.shutdown(wait=False)
|
||
|
|
elif testcase_params['active_traffic'] and testcase_params[
|
||
|
|
'traffic_type'] == 'ping':
|
||
|
|
thread_future = wputils.get_ping_stats_nb(
|
||
|
|
self.remote_server, self.dut_ip,
|
||
|
|
testcase_params['traffic_timeout'], 0.02, 64)
|
||
|
|
else:
|
||
|
|
thread_future = wputils.get_ping_stats_nb(
|
||
|
|
self.remote_server, self.dut_ip,
|
||
|
|
testcase_params['traffic_timeout'], 0.5, 64)
|
||
|
|
llstats_obj.update_stats()
|
||
|
|
for atten in testcase_params['rssi_atten_range']:
|
||
|
|
# Set Attenuation
|
||
|
|
self.log.info('Setting attenuation to {} dB'.format(atten))
|
||
|
|
for attenuator in self.attenuators:
|
||
|
|
attenuator.set_atten(atten)
|
||
|
|
current_rssi = collections.OrderedDict()
|
||
|
|
current_rssi = wputils.get_connected_rssi(
|
||
|
|
self.dut, testcase_params['connected_measurements'],
|
||
|
|
self.testclass_params['polling_frequency'],
|
||
|
|
testcase_params['first_measurement_delay'])
|
||
|
|
current_rssi['scan_rssi'] = wputils.get_scan_rssi(
|
||
|
|
self.dut, testcase_params['tracked_bssid'],
|
||
|
|
testcase_params['scan_measurements'])
|
||
|
|
rssi_result['rssi_result'].append(current_rssi)
|
||
|
|
llstats_obj.update_stats()
|
||
|
|
curr_llstats = llstats_obj.llstats_incremental.copy()
|
||
|
|
rssi_result['llstats'].append(curr_llstats)
|
||
|
|
self.log.info(
|
||
|
|
'Connected RSSI at {0:.2f} dB is {1:.2f} [{2:.2f}, {3:.2f}] dB'
|
||
|
|
.format(atten, current_rssi['signal_poll_rssi']['mean'],
|
||
|
|
current_rssi['chain_0_rssi']['mean'],
|
||
|
|
current_rssi['chain_1_rssi']['mean']))
|
||
|
|
# Stop iperf traffic if needed
|
||
|
|
for attenuator in self.attenuators:
|
||
|
|
attenuator.set_atten(0)
|
||
|
|
thread_future.result()
|
||
|
|
if testcase_params['active_traffic'] and testcase_params[
|
||
|
|
'traffic_type'] == 'iperf':
|
||
|
|
self.iperf_server.stop()
|
||
|
|
return rssi_result
|
||
|
|
|
||
|
|
def setup_ap(self, testcase_params):
|
||
|
|
"""Function that gets devices ready for the test.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
"""
|
||
|
|
band = self.access_point.band_lookup_by_channel(
|
||
|
|
testcase_params['channel'])
|
||
|
|
if '6G' in band:
|
||
|
|
frequency = wutils.WifiEnums.channel_6G_to_freq[int(
|
||
|
|
testcase_params['channel'].strip('6g'))]
|
||
|
|
else:
|
||
|
|
if testcase_params['channel'] < 13:
|
||
|
|
frequency = wutils.WifiEnums.channel_2G_to_freq[
|
||
|
|
testcase_params['channel']]
|
||
|
|
else:
|
||
|
|
frequency = wutils.WifiEnums.channel_5G_to_freq[
|
||
|
|
testcase_params['channel']]
|
||
|
|
if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
|
||
|
|
self.access_point.set_region(self.testbed_params['DFS_region'])
|
||
|
|
else:
|
||
|
|
self.access_point.set_region(self.testbed_params['default_region'])
|
||
|
|
self.access_point.set_channel(testcase_params['band'],
|
||
|
|
testcase_params['channel'])
|
||
|
|
self.access_point.set_bandwidth(testcase_params['band'],
|
||
|
|
testcase_params['mode'])
|
||
|
|
self.log.info('Access Point Configuration: {}'.format(
|
||
|
|
self.access_point.ap_settings))
|
||
|
|
|
||
|
|
def setup_dut(self, testcase_params):
|
||
|
|
"""Sets up the DUT in the configuration required by the test."""
|
||
|
|
# Turn screen off to preserve battery
|
||
|
|
if self.testbed_params.get('screen_on',
|
||
|
|
False) or self.testclass_params.get(
|
||
|
|
'screen_on', False):
|
||
|
|
self.dut.droid.wakeLockAcquireDim()
|
||
|
|
else:
|
||
|
|
self.dut.go_to_sleep()
|
||
|
|
if wputils.validate_network(self.dut,
|
||
|
|
testcase_params['test_network']['SSID']):
|
||
|
|
self.log.info('Already connected to desired network')
|
||
|
|
else:
|
||
|
|
wutils.wifi_toggle_state(self.dut, True)
|
||
|
|
wutils.reset_wifi(self.dut)
|
||
|
|
self.main_network[testcase_params['band']][
|
||
|
|
'channel'] = testcase_params['channel']
|
||
|
|
wutils.set_wifi_country_code(self.dut,
|
||
|
|
self.testclass_params['country_code'])
|
||
|
|
if self.testbed_params.get('txbf_off', False):
|
||
|
|
wputils.disable_beamforming(self.dut)
|
||
|
|
wutils.wifi_connect(self.dut,
|
||
|
|
self.main_network[testcase_params['band']],
|
||
|
|
num_of_tries=5)
|
||
|
|
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
|
||
|
|
|
||
|
|
def setup_rssi_test(self, testcase_params):
|
||
|
|
"""Main function to test RSSI.
|
||
|
|
|
||
|
|
The function sets up the AP in the correct channel and mode
|
||
|
|
configuration and called rssi_test to sweep attenuation and measure
|
||
|
|
RSSI
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
Returns:
|
||
|
|
rssi_result: dict containing rssi_results and meta data
|
||
|
|
"""
|
||
|
|
# Configure AP
|
||
|
|
self.setup_ap(testcase_params)
|
||
|
|
# Initialize attenuators
|
||
|
|
for attenuator in self.attenuators:
|
||
|
|
attenuator.set_atten(testcase_params['rssi_atten_range'][0])
|
||
|
|
# Connect DUT to Network
|
||
|
|
self.setup_dut(testcase_params)
|
||
|
|
|
||
|
|
def get_traffic_timeout(self, testcase_params):
|
||
|
|
"""Function to comput iperf session length required in RSSI test.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
Returns:
|
||
|
|
traffic_timeout: length of iperf session required in rssi test
|
||
|
|
"""
|
||
|
|
atten_step_duration = testcase_params['first_measurement_delay'] + (
|
||
|
|
testcase_params['connected_measurements'] *
|
||
|
|
self.testclass_params['polling_frequency']
|
||
|
|
) + testcase_params['scan_measurements'] * MED_SLEEP
|
||
|
|
timeout = len(testcase_params['rssi_atten_range']
|
||
|
|
) * atten_step_duration + MED_SLEEP
|
||
|
|
return timeout
|
||
|
|
|
||
|
|
def compile_rssi_vs_atten_test_params(self, testcase_params):
|
||
|
|
"""Function to complete compiling test-specific parameters
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
"""
|
||
|
|
# Check if test should be skipped.
|
||
|
|
wputils.check_skip_conditions(testcase_params, self.dut,
|
||
|
|
self.access_point,
|
||
|
|
getattr(self, 'ota_chamber', None))
|
||
|
|
|
||
|
|
testcase_params.update(
|
||
|
|
connected_measurements=self.
|
||
|
|
testclass_params['rssi_vs_atten_connected_measurements'],
|
||
|
|
scan_measurements=self.
|
||
|
|
testclass_params['rssi_vs_atten_scan_measurements'],
|
||
|
|
first_measurement_delay=SHORT_SLEEP,
|
||
|
|
absolute_accuracy=1)
|
||
|
|
rssi_under_test = self.testclass_params['rssi_vs_atten_metrics']
|
||
|
|
if self.testclass_params[
|
||
|
|
'rssi_vs_atten_scan_measurements'] == 0 and 'scan_rssi' in rssi_under_test:
|
||
|
|
rssi_under_test.remove('scan_rssi')
|
||
|
|
testcase_params['rssi_under_test'] = rssi_under_test
|
||
|
|
|
||
|
|
testcase_params['band'] = self.access_point.band_lookup_by_channel(
|
||
|
|
testcase_params['channel'])
|
||
|
|
testcase_params['test_network'] = self.main_network[
|
||
|
|
testcase_params['band']]
|
||
|
|
testcase_params['tracked_bssid'] = [
|
||
|
|
self.main_network[testcase_params['band']].get(
|
||
|
|
'BSSID', '00:00:00:00')
|
||
|
|
]
|
||
|
|
|
||
|
|
testcase_params['rssi_atten_range'] = numpy.arange(
|
||
|
|
self.testclass_params['rssi_vs_atten_start'],
|
||
|
|
self.testclass_params['rssi_vs_atten_stop'],
|
||
|
|
self.testclass_params['rssi_vs_atten_step']).tolist()
|
||
|
|
|
||
|
|
testcase_params['traffic_timeout'] = self.get_traffic_timeout(
|
||
|
|
testcase_params)
|
||
|
|
|
||
|
|
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
else:
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
return testcase_params
|
||
|
|
|
||
|
|
def compile_rssi_stability_test_params(self, testcase_params):
|
||
|
|
"""Function to complete compiling test-specific parameters
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
"""
|
||
|
|
# Check if test should be skipped.
|
||
|
|
wputils.check_skip_conditions(testcase_params, self.dut,
|
||
|
|
self.access_point,
|
||
|
|
getattr(self, 'ota_chamber', None))
|
||
|
|
testcase_params.update(
|
||
|
|
connected_measurements=int(
|
||
|
|
self.testclass_params['rssi_stability_duration'] /
|
||
|
|
self.testclass_params['polling_frequency']),
|
||
|
|
scan_measurements=0,
|
||
|
|
first_measurement_delay=SHORT_SLEEP,
|
||
|
|
rssi_atten_range=self.testclass_params['rssi_stability_atten'])
|
||
|
|
testcase_params['band'] = self.access_point.band_lookup_by_channel(
|
||
|
|
testcase_params['channel'])
|
||
|
|
testcase_params['test_network'] = self.main_network[
|
||
|
|
testcase_params['band']]
|
||
|
|
testcase_params['tracked_bssid'] = [
|
||
|
|
self.main_network[testcase_params['band']].get(
|
||
|
|
'BSSID', '00:00:00:00')
|
||
|
|
]
|
||
|
|
|
||
|
|
testcase_params['traffic_timeout'] = self.get_traffic_timeout(
|
||
|
|
testcase_params)
|
||
|
|
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
else:
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
return testcase_params
|
||
|
|
|
||
|
|
def compile_rssi_tracking_test_params(self, testcase_params):
|
||
|
|
"""Function to complete compiling test-specific parameters
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
"""
|
||
|
|
# Check if test should be skipped.
|
||
|
|
wputils.check_skip_conditions(testcase_params, self.dut,
|
||
|
|
self.access_point,
|
||
|
|
getattr(self, 'ota_chamber', None))
|
||
|
|
|
||
|
|
testcase_params.update(connected_measurements=int(
|
||
|
|
1 / self.testclass_params['polling_frequency']),
|
||
|
|
scan_measurements=0,
|
||
|
|
first_measurement_delay=0,
|
||
|
|
rssi_under_test=['signal_poll_rssi'],
|
||
|
|
absolute_accuracy=0)
|
||
|
|
testcase_params['band'] = self.access_point.band_lookup_by_channel(
|
||
|
|
testcase_params['channel'])
|
||
|
|
testcase_params['test_network'] = self.main_network[
|
||
|
|
testcase_params['band']]
|
||
|
|
testcase_params['tracked_bssid'] = [
|
||
|
|
self.main_network[testcase_params['band']].get(
|
||
|
|
'BSSID', '00:00:00:00')
|
||
|
|
]
|
||
|
|
|
||
|
|
rssi_atten_range = []
|
||
|
|
for waveform in self.testclass_params['rssi_tracking_waveforms']:
|
||
|
|
waveform_vector = []
|
||
|
|
for section in range(len(waveform['atten_levels']) - 1):
|
||
|
|
section_limits = waveform['atten_levels'][section:section + 2]
|
||
|
|
up_down = (1 - 2 * (section_limits[1] < section_limits[0]))
|
||
|
|
temp_section = list(
|
||
|
|
range(section_limits[0], section_limits[1] + up_down,
|
||
|
|
up_down * waveform['step_size']))
|
||
|
|
temp_section = [
|
||
|
|
temp_section[idx] for idx in range(len(temp_section))
|
||
|
|
for n in range(waveform['step_duration'])
|
||
|
|
]
|
||
|
|
waveform_vector += temp_section
|
||
|
|
waveform_vector = waveform_vector * waveform['repetitions']
|
||
|
|
rssi_atten_range = rssi_atten_range + waveform_vector
|
||
|
|
testcase_params['rssi_atten_range'] = rssi_atten_range
|
||
|
|
testcase_params['traffic_timeout'] = self.get_traffic_timeout(
|
||
|
|
testcase_params)
|
||
|
|
|
||
|
|
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
else:
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
return testcase_params
|
||
|
|
|
||
|
|
def _test_rssi_vs_atten(self, testcase_params):
|
||
|
|
"""Function that gets called for each test case of rssi_vs_atten
|
||
|
|
|
||
|
|
The function gets called in each rssi test case. The function
|
||
|
|
customizes the test based on the test name of the test that called it
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
"""
|
||
|
|
testcase_params = self.compile_rssi_vs_atten_test_params(
|
||
|
|
testcase_params)
|
||
|
|
|
||
|
|
self.setup_rssi_test(testcase_params)
|
||
|
|
rssi_result = self.run_rssi_test(testcase_params)
|
||
|
|
rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
|
||
|
|
rssi_result)
|
||
|
|
self.testclass_results.append(rssi_result)
|
||
|
|
self.plot_rssi_vs_attenuation(rssi_result['postprocessed_results'])
|
||
|
|
self.pass_fail_check_rssi_accuracy(
|
||
|
|
testcase_params, rssi_result['postprocessed_results'])
|
||
|
|
|
||
|
|
def _test_rssi_stability(self, testcase_params):
|
||
|
|
""" Function that gets called for each test case of rssi_stability
|
||
|
|
|
||
|
|
The function gets called in each stability test case. The function
|
||
|
|
customizes test based on the test name of the test that called it
|
||
|
|
"""
|
||
|
|
testcase_params = self.compile_rssi_stability_test_params(
|
||
|
|
testcase_params)
|
||
|
|
|
||
|
|
self.setup_rssi_test(testcase_params)
|
||
|
|
rssi_result = self.run_rssi_test(testcase_params)
|
||
|
|
rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
|
||
|
|
rssi_result)
|
||
|
|
self.testclass_results.append(rssi_result)
|
||
|
|
self.plot_rssi_vs_time(rssi_result,
|
||
|
|
rssi_result['postprocessed_results'], 1)
|
||
|
|
self.plot_rssi_distribution(rssi_result['postprocessed_results'])
|
||
|
|
self.pass_fail_check_rssi_stability(
|
||
|
|
testcase_params, rssi_result['postprocessed_results'])
|
||
|
|
|
||
|
|
def _test_rssi_tracking(self, testcase_params):
|
||
|
|
""" Function that gets called for each test case of rssi_tracking
|
||
|
|
|
||
|
|
The function gets called in each rssi test case. The function
|
||
|
|
customizes the test based on the test name of the test that called it
|
||
|
|
"""
|
||
|
|
testcase_params = self.compile_rssi_tracking_test_params(
|
||
|
|
testcase_params)
|
||
|
|
|
||
|
|
self.setup_rssi_test(testcase_params)
|
||
|
|
rssi_result = self.run_rssi_test(testcase_params)
|
||
|
|
rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
|
||
|
|
rssi_result)
|
||
|
|
self.testclass_results.append(rssi_result)
|
||
|
|
self.plot_rssi_vs_time(rssi_result,
|
||
|
|
rssi_result['postprocessed_results'], 1)
|
||
|
|
self.pass_fail_check_rssi_accuracy(
|
||
|
|
testcase_params, rssi_result['postprocessed_results'])
|
||
|
|
|
||
|
|
def generate_test_cases(self, test_types, channels, modes, traffic_modes):
|
||
|
|
"""Function that auto-generates test cases for a test class."""
|
||
|
|
test_cases = []
|
||
|
|
allowed_configs = {
|
||
|
|
20: [
|
||
|
|
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
|
||
|
|
116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
|
||
|
|
],
|
||
|
|
40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
|
||
|
|
80: [36, 100, 149, '6g37', '6g117', '6g213'],
|
||
|
|
160: [36, '6g37', '6g117', '6g213']
|
||
|
|
}
|
||
|
|
|
||
|
|
for channel, mode, traffic_mode, test_type in itertools.product(
|
||
|
|
channels, modes, traffic_modes, test_types):
|
||
|
|
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
|
||
|
|
if channel not in allowed_configs[bandwidth]:
|
||
|
|
continue
|
||
|
|
test_name = test_type + '_ch{}_{}_{}'.format(
|
||
|
|
channel, mode, traffic_mode)
|
||
|
|
testcase_params = collections.OrderedDict(
|
||
|
|
channel=channel,
|
||
|
|
mode=mode,
|
||
|
|
active_traffic=(traffic_mode == 'ActiveTraffic'),
|
||
|
|
traffic_type=self.user_params['rssi_test_params']
|
||
|
|
['traffic_type'],
|
||
|
|
)
|
||
|
|
test_function = getattr(self, '_{}'.format(test_type))
|
||
|
|
setattr(self, test_name, partial(test_function, testcase_params))
|
||
|
|
test_cases.append(test_name)
|
||
|
|
return test_cases
|
||
|
|
|
||
|
|
|
||
|
|
class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
super().__init__(controllers)
|
||
|
|
self.tests = self.generate_test_cases(
|
||
|
|
['test_rssi_stability', 'test_rssi_vs_atten'], [1, 2, 6, 10, 11],
|
||
|
|
['bw20'], ['ActiveTraffic'])
|
||
|
|
|
||
|
|
|
||
|
|
class WifiRssi_5GHz_ActiveTraffic_Test(WifiRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
super().__init__(controllers)
|
||
|
|
self.tests = self.generate_test_cases(
|
||
|
|
['test_rssi_stability', 'test_rssi_vs_atten'],
|
||
|
|
[36, 40, 44, 48, 149, 153, 157, 161], ['bw20', 'bw40', 'bw80'],
|
||
|
|
['ActiveTraffic'])
|
||
|
|
|
||
|
|
|
||
|
|
class WifiRssi_AllChannels_ActiveTraffic_Test(WifiRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
super().__init__(controllers)
|
||
|
|
self.tests = self.generate_test_cases(
|
||
|
|
['test_rssi_stability', 'test_rssi_vs_atten'], [
|
||
|
|
1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
|
||
|
|
'6g213'
|
||
|
|
], ['bw20', 'bw40', 'bw80', 'bw160'], ['ActiveTraffic'])
|
||
|
|
|
||
|
|
|
||
|
|
class WifiRssi_SampleChannels_NoTraffic_Test(WifiRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
super().__init__(controllers)
|
||
|
|
self.tests = self.generate_test_cases(
|
||
|
|
['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149],
|
||
|
|
['bw20', 'bw40', 'bw80'], ['NoTraffic'])
|
||
|
|
|
||
|
|
|
||
|
|
class WifiRssiTrackingTest(WifiRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
super().__init__(controllers)
|
||
|
|
self.tests = self.generate_test_cases(['test_rssi_tracking'],
|
||
|
|
[6, 36, 149],
|
||
|
|
['bw20', 'bw40', 'bw80'],
|
||
|
|
['ActiveTraffic', 'NoTraffic'])
|
||
|
|
|
||
|
|
|
||
|
|
# Over-the air version of RSSI tests
|
||
|
|
class WifiOtaRssiTest(WifiRssiTest):
|
||
|
|
"""Class to test over-the-air rssi tests.
|
||
|
|
|
||
|
|
This class implements measures WiFi RSSI tests in an OTA chamber.
|
||
|
|
It allows setting orientation and other chamber parameters to study
|
||
|
|
performance in varying channel conditions
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
base_test.BaseTestClass.__init__(self, controllers)
|
||
|
|
self.testcase_metric_logger = (
|
||
|
|
BlackboxMappedMetricLogger.for_test_case())
|
||
|
|
self.testclass_metric_logger = (
|
||
|
|
BlackboxMappedMetricLogger.for_test_class())
|
||
|
|
self.publish_test_metrics = False
|
||
|
|
|
||
|
|
def setup_class(self):
|
||
|
|
WifiRssiTest.setup_class(self)
|
||
|
|
self.ota_chamber = ota_chamber.create(
|
||
|
|
self.user_params['OTAChamber'])[0]
|
||
|
|
|
||
|
|
def teardown_class(self):
|
||
|
|
WifiRssiTest.teardown_class(self)
|
||
|
|
self.ota_chamber.reset_chamber()
|
||
|
|
self.process_testclass_results()
|
||
|
|
|
||
|
|
def teardown_test(self):
|
||
|
|
if self.ota_chamber.current_mode == 'continuous':
|
||
|
|
self.ota_chamber.reset_chamber()
|
||
|
|
|
||
|
|
def extract_test_id(self, testcase_params, id_fields):
|
||
|
|
test_id = collections.OrderedDict(
|
||
|
|
(param, testcase_params[param]) for param in id_fields)
|
||
|
|
return test_id
|
||
|
|
|
||
|
|
def process_testclass_results(self):
|
||
|
|
"""Saves all test results to enable comparison."""
|
||
|
|
testclass_data = collections.OrderedDict()
|
||
|
|
for test_result in self.testclass_results:
|
||
|
|
current_params = test_result['testcase_params']
|
||
|
|
|
||
|
|
channel = current_params['channel']
|
||
|
|
channel_data = testclass_data.setdefault(
|
||
|
|
channel,
|
||
|
|
collections.OrderedDict(orientation=[],
|
||
|
|
rssi=collections.OrderedDict(
|
||
|
|
signal_poll_rssi=[],
|
||
|
|
chain_0_rssi=[],
|
||
|
|
chain_1_rssi=[])))
|
||
|
|
|
||
|
|
channel_data['orientation'].append(current_params['orientation'])
|
||
|
|
channel_data['rssi']['signal_poll_rssi'].append(
|
||
|
|
test_result['postprocessed_results']['signal_poll_rssi']
|
||
|
|
['mean'][0])
|
||
|
|
channel_data['rssi']['chain_0_rssi'].append(
|
||
|
|
test_result['postprocessed_results']['chain_0_rssi']['mean']
|
||
|
|
[0])
|
||
|
|
channel_data['rssi']['chain_1_rssi'].append(
|
||
|
|
test_result['postprocessed_results']['chain_1_rssi']['mean']
|
||
|
|
[0])
|
||
|
|
|
||
|
|
# Publish test class metrics
|
||
|
|
for channel, channel_data in testclass_data.items():
|
||
|
|
for rssi_metric, rssi_metric_value in channel_data['rssi'].items():
|
||
|
|
metric_name = 'ota_summary_ch{}.avg_{}'.format(
|
||
|
|
channel, rssi_metric)
|
||
|
|
metric_value = numpy.mean(rssi_metric_value)
|
||
|
|
self.testclass_metric_logger.add_metric(
|
||
|
|
metric_name, metric_value)
|
||
|
|
|
||
|
|
# Plot test class results
|
||
|
|
chamber_mode = self.testclass_results[0]['testcase_params'][
|
||
|
|
'chamber_mode']
|
||
|
|
if chamber_mode == 'orientation':
|
||
|
|
x_label = 'Angle (deg)'
|
||
|
|
elif chamber_mode == 'stepped stirrers':
|
||
|
|
x_label = 'Position Index'
|
||
|
|
elif chamber_mode == 'StirrersOn':
|
||
|
|
return
|
||
|
|
plots = []
|
||
|
|
for channel, channel_data in testclass_data.items():
|
||
|
|
current_plot = BokehFigure(
|
||
|
|
title='Channel {} - Rssi vs. Position'.format(channel),
|
||
|
|
x_label=x_label,
|
||
|
|
primary_y_label='RSSI (dBm)',
|
||
|
|
)
|
||
|
|
for rssi_metric, rssi_metric_value in channel_data['rssi'].items():
|
||
|
|
legend = rssi_metric
|
||
|
|
current_plot.add_line(channel_data['orientation'],
|
||
|
|
rssi_metric_value, legend)
|
||
|
|
current_plot.generate_figure()
|
||
|
|
plots.append(current_plot)
|
||
|
|
current_context = context.get_current_context().get_full_output_path()
|
||
|
|
plot_file_path = os.path.join(current_context, 'results.html')
|
||
|
|
BokehFigure.save_figures(plots, plot_file_path)
|
||
|
|
|
||
|
|
def setup_rssi_test(self, testcase_params):
|
||
|
|
# Test setup
|
||
|
|
WifiRssiTest.setup_rssi_test(self, testcase_params)
|
||
|
|
if testcase_params['chamber_mode'] == 'StirrersOn':
|
||
|
|
self.ota_chamber.start_continuous_stirrers()
|
||
|
|
else:
|
||
|
|
self.ota_chamber.set_orientation(testcase_params['orientation'])
|
||
|
|
|
||
|
|
def compile_ota_rssi_test_params(self, testcase_params):
|
||
|
|
"""Function to complete compiling test-specific parameters
|
||
|
|
|
||
|
|
Args:
|
||
|
|
testcase_params: dict containing test-specific parameters
|
||
|
|
"""
|
||
|
|
# Check if test should be skipped.
|
||
|
|
wputils.check_skip_conditions(testcase_params, self.dut,
|
||
|
|
self.access_point,
|
||
|
|
getattr(self, 'ota_chamber', None))
|
||
|
|
|
||
|
|
if 'rssi_over_orientation' in self.test_name:
|
||
|
|
rssi_test_duration = self.testclass_params[
|
||
|
|
'rssi_over_orientation_duration']
|
||
|
|
rssi_ota_test_attenuation = [
|
||
|
|
self.testclass_params['rssi_ota_test_attenuation']
|
||
|
|
]
|
||
|
|
elif 'rssi_variation' in self.test_name:
|
||
|
|
rssi_test_duration = self.testclass_params[
|
||
|
|
'rssi_variation_duration']
|
||
|
|
rssi_ota_test_attenuation = [
|
||
|
|
self.testclass_params['rssi_ota_test_attenuation']
|
||
|
|
]
|
||
|
|
elif 'rssi_vs_atten' in self.test_name:
|
||
|
|
rssi_test_duration = self.testclass_params[
|
||
|
|
'rssi_over_orientation_duration']
|
||
|
|
rssi_ota_test_attenuation = numpy.arange(
|
||
|
|
self.testclass_params['rssi_vs_atten_start'],
|
||
|
|
self.testclass_params['rssi_vs_atten_stop'],
|
||
|
|
self.testclass_params['rssi_vs_atten_step']).tolist()
|
||
|
|
|
||
|
|
testcase_params.update(connected_measurements=int(
|
||
|
|
rssi_test_duration / self.testclass_params['polling_frequency']),
|
||
|
|
scan_measurements=0,
|
||
|
|
first_measurement_delay=SHORT_SLEEP,
|
||
|
|
rssi_atten_range=rssi_ota_test_attenuation)
|
||
|
|
testcase_params['band'] = self.access_point.band_lookup_by_channel(
|
||
|
|
testcase_params['channel'])
|
||
|
|
testcase_params['test_network'] = self.main_network[
|
||
|
|
testcase_params['band']]
|
||
|
|
testcase_params['tracked_bssid'] = [
|
||
|
|
self.main_network[testcase_params['band']].get(
|
||
|
|
'BSSID', '00:00:00:00')
|
||
|
|
]
|
||
|
|
|
||
|
|
testcase_params['traffic_timeout'] = self.get_traffic_timeout(
|
||
|
|
testcase_params)
|
||
|
|
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
else:
|
||
|
|
testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
|
||
|
|
testcase_params['traffic_timeout'])
|
||
|
|
return testcase_params
|
||
|
|
|
||
|
|
def _test_ota_rssi(self, testcase_params):
|
||
|
|
testcase_params = self.compile_ota_rssi_test_params(testcase_params)
|
||
|
|
|
||
|
|
self.setup_rssi_test(testcase_params)
|
||
|
|
rssi_result = self.run_rssi_test(testcase_params)
|
||
|
|
rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
|
||
|
|
rssi_result)
|
||
|
|
self.testclass_results.append(rssi_result)
|
||
|
|
self.plot_rssi_vs_time(rssi_result,
|
||
|
|
rssi_result['postprocessed_results'], 1)
|
||
|
|
if 'rssi_vs_atten' in self.test_name:
|
||
|
|
self.plot_rssi_vs_attenuation(rssi_result['postprocessed_results'])
|
||
|
|
elif 'rssi_variation' in self.test_name:
|
||
|
|
self.plot_rssi_distribution(rssi_result['postprocessed_results'])
|
||
|
|
|
||
|
|
def generate_test_cases(self, test_types, channels, modes, traffic_modes,
|
||
|
|
chamber_modes, orientations):
|
||
|
|
test_cases = []
|
||
|
|
allowed_configs = {
|
||
|
|
20: [
|
||
|
|
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
|
||
|
|
116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
|
||
|
|
],
|
||
|
|
40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
|
||
|
|
80: [36, 100, 149, '6g37', '6g117', '6g213'],
|
||
|
|
160: [36, '6g37', '6g117', '6g213']
|
||
|
|
}
|
||
|
|
|
||
|
|
for (channel, mode, traffic, chamber_mode, orientation,
|
||
|
|
test_type) in itertools.product(channels, modes, traffic_modes,
|
||
|
|
chamber_modes, orientations,
|
||
|
|
test_types):
|
||
|
|
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
|
||
|
|
if channel not in allowed_configs[bandwidth]:
|
||
|
|
continue
|
||
|
|
test_name = test_type + '_ch{}_{}_{}_{}deg'.format(
|
||
|
|
channel, mode, traffic, orientation)
|
||
|
|
testcase_params = collections.OrderedDict(
|
||
|
|
channel=channel,
|
||
|
|
mode=mode,
|
||
|
|
active_traffic=(traffic == 'ActiveTraffic'),
|
||
|
|
traffic_type=self.user_params['rssi_test_params']
|
||
|
|
['traffic_type'],
|
||
|
|
chamber_mode=chamber_mode,
|
||
|
|
orientation=orientation)
|
||
|
|
test_function = self._test_ota_rssi
|
||
|
|
setattr(self, test_name, partial(test_function, testcase_params))
|
||
|
|
test_cases.append(test_name)
|
||
|
|
return test_cases
|
||
|
|
|
||
|
|
|
||
|
|
class WifiOtaRssi_Accuracy_Test(WifiOtaRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
super().__init__(controllers)
|
||
|
|
self.tests = self.generate_test_cases(['test_rssi_vs_atten'],
|
||
|
|
[6, 36, 149, '6g37'], ['bw20'],
|
||
|
|
['ActiveTraffic'],
|
||
|
|
['orientation'],
|
||
|
|
list(range(0, 360, 45)))
|
||
|
|
|
||
|
|
|
||
|
|
class WifiOtaRssi_StirrerVariation_Test(WifiOtaRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
WifiRssiTest.__init__(self, controllers)
|
||
|
|
self.tests = self.generate_test_cases(['test_rssi_variation'],
|
||
|
|
[6, 36, 149, '6g37'], ['bw20'],
|
||
|
|
['ActiveTraffic'],
|
||
|
|
['StirrersOn'], [0])
|
||
|
|
|
||
|
|
|
||
|
|
class WifiOtaRssi_TenDegree_Test(WifiOtaRssiTest):
|
||
|
|
|
||
|
|
def __init__(self, controllers):
|
||
|
|
WifiRssiTest.__init__(self, controllers)
|
||
|
|
self.tests = self.generate_test_cases(['test_rssi_over_orientation'],
|
||
|
|
[6, 36, 149, '6g37'], ['bw20'],
|
||
|
|
['ActiveTraffic'],
|
||
|
|
['orientation'],
|
||
|
|
list(range(0, 360, 10)))
|