789 lines
30 KiB
Python
789 lines
30 KiB
Python
# Copyright 2021 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import common
|
|
import numpy as np
|
|
import time
|
|
|
|
from autotest_lib.server.cros.cellular import cellular_simulator
|
|
from enum import Enum
|
|
|
|
|
|
class BaseSimulation(object):
|
|
""" Base class for cellular connectivity simulations.
|
|
|
|
Classes that inherit from this base class implement different simulation
|
|
setups. The base class contains methods that are common to all simulation
|
|
configurations.
|
|
|
|
"""
|
|
|
|
NUM_UL_CAL_READS = 3
|
|
NUM_DL_CAL_READS = 5
|
|
MAX_BTS_INPUT_POWER = 30
|
|
MAX_PHONE_OUTPUT_POWER = 23
|
|
UL_MIN_POWER = -60.0
|
|
|
|
# Keys to obtain settings from the test_config dictionary.
|
|
KEY_CALIBRATION = "calibration"
|
|
KEY_ATTACH_RETRIES = "attach_retries"
|
|
KEY_ATTACH_TIMEOUT = "attach_timeout"
|
|
|
|
# Filepath to the config files stored in the Anritsu callbox. Needs to be
|
|
# formatted to replace {} with either A or B depending on the model.
|
|
CALLBOX_PATH_FORMAT_STR = 'C:\\Users\\MD8475{}\\Documents\\DAN_configs\\'
|
|
|
|
# Time in seconds to wait for the phone to settle
|
|
# after attaching to the base station.
|
|
SETTLING_TIME = 10
|
|
|
|
# Default time in seconds to wait for the phone to attach to the basestation
|
|
# after toggling airplane mode. This setting can be changed with the
|
|
# KEY_ATTACH_TIMEOUT keyword in the test configuration file.
|
|
DEFAULT_ATTACH_TIMEOUT = 120
|
|
|
|
# The default number of attach retries. This setting can be changed with
|
|
# the KEY_ATTACH_RETRIES keyword in the test configuration file.
|
|
DEFAULT_ATTACH_RETRIES = 3
|
|
|
|
# These two dictionaries allow to map from a string to a signal level and
|
|
# have to be overridden by the simulations inheriting from this class.
|
|
UPLINK_SIGNAL_LEVEL_DICTIONARY = {}
|
|
DOWNLINK_SIGNAL_LEVEL_DICTIONARY = {}
|
|
|
|
# Units for downlink signal level. This variable has to be overridden by
|
|
# the simulations inheriting from this class.
|
|
DOWNLINK_SIGNAL_LEVEL_UNITS = None
|
|
|
|
class BtsConfig(object):
|
|
""" Base station configuration class. This class is only a container for
|
|
base station parameters and should not interact with the instrument
|
|
controller.
|
|
|
|
Attributes:
|
|
output_power: a float indicating the required signal level at the
|
|
instrument's output.
|
|
input_level: a float indicating the required signal level at the
|
|
instrument's input.
|
|
"""
|
|
|
|
def __init__(self):
|
|
""" Initialize the base station config by setting all its
|
|
parameters to None. """
|
|
self.output_power = None
|
|
self.input_power = None
|
|
self.band = None
|
|
|
|
def incorporate(self, new_config):
|
|
""" Incorporates a different configuration by replacing the current
|
|
values with the new ones for all the parameters different to None.
|
|
"""
|
|
for attr, value in vars(new_config).items():
|
|
if value:
|
|
setattr(self, attr, value)
|
|
|
|
def __init__(self, simulator, log, dut, test_config, calibration_table):
|
|
""" Initializes the Simulation object.
|
|
|
|
Keeps a reference to the callbox, log and dut handlers and
|
|
initializes the class attributes.
|
|
|
|
Args:
|
|
simulator: a cellular simulator controller
|
|
log: a logger handle
|
|
dut: a device handler implementing BaseCellularDut
|
|
test_config: test configuration obtained from the config file
|
|
calibration_table: a dictionary containing path losses for
|
|
different bands.
|
|
"""
|
|
|
|
self.simulator = simulator
|
|
self.log = log
|
|
self.dut = dut
|
|
self.calibration_table = calibration_table
|
|
|
|
# Turn calibration on or off depending on the test config value. If the
|
|
# key is not present, set to False by default
|
|
if self.KEY_CALIBRATION not in test_config:
|
|
self.log.warning('The {} key is not set in the testbed '
|
|
'parameters. Setting to off by default. To '
|
|
'turn calibration on, include the key with '
|
|
'a true/false value.'.format(
|
|
self.KEY_CALIBRATION))
|
|
|
|
self.calibration_required = test_config.get(self.KEY_CALIBRATION,
|
|
False)
|
|
|
|
# Obtain the allowed number of retries from the test configs
|
|
if self.KEY_ATTACH_RETRIES not in test_config:
|
|
self.log.warning('The {} key is not set in the testbed '
|
|
'parameters. Setting to {} by default.'.format(
|
|
self.KEY_ATTACH_RETRIES,
|
|
self.DEFAULT_ATTACH_RETRIES))
|
|
|
|
self.attach_retries = test_config.get(self.KEY_ATTACH_RETRIES,
|
|
self.DEFAULT_ATTACH_RETRIES)
|
|
|
|
# Obtain the attach timeout from the test configs
|
|
if self.KEY_ATTACH_TIMEOUT not in test_config:
|
|
self.log.warning('The {} key is not set in the testbed '
|
|
'parameters. Setting to {} by default.'.format(
|
|
self.KEY_ATTACH_TIMEOUT,
|
|
self.DEFAULT_ATTACH_TIMEOUT))
|
|
|
|
self.attach_timeout = test_config.get(self.KEY_ATTACH_TIMEOUT,
|
|
self.DEFAULT_ATTACH_TIMEOUT)
|
|
|
|
# Configuration object for the primary base station
|
|
self.primary_config = self.BtsConfig()
|
|
|
|
# Store the current calibrated band
|
|
self.current_calibrated_band = None
|
|
|
|
# Path loss measured during calibration
|
|
self.dl_path_loss = None
|
|
self.ul_path_loss = None
|
|
|
|
# Target signal levels obtained during configuration
|
|
self.sim_dl_power = None
|
|
self.sim_ul_power = None
|
|
|
|
# Stores RRC status change timer
|
|
self.rrc_sc_timer = None
|
|
|
|
# Set to default APN
|
|
log.info("Configuring APN.")
|
|
self.dut.set_apn('test', 'test')
|
|
|
|
# Enable roaming on the phone
|
|
self.dut.toggle_data_roaming(True)
|
|
|
|
# Make sure airplane mode is on so the phone won't attach right away
|
|
self.dut.toggle_airplane_mode(True)
|
|
|
|
# Wait for airplane mode setting to propagate
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
# Prepare the simulator for this simulation setup
|
|
self.setup_simulator()
|
|
|
|
def setup_simulator(self):
|
|
""" Do initial configuration in the simulator. """
|
|
raise NotImplementedError()
|
|
|
|
def attach(self):
|
|
""" Attach the phone to the basestation.
|
|
|
|
Sets a good signal level, toggles airplane mode
|
|
and waits for the phone to attach.
|
|
|
|
Returns:
|
|
True if the phone was able to attach, False if not.
|
|
"""
|
|
|
|
# Turn on airplane mode
|
|
self.dut.toggle_airplane_mode(True)
|
|
|
|
# Wait for airplane mode setting to propagate
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
# Provide a good signal power for the phone to attach easily
|
|
new_config = self.BtsConfig()
|
|
new_config.input_power = -10
|
|
new_config.output_power = -30
|
|
self.simulator.configure_bts(new_config)
|
|
self.primary_config.incorporate(new_config)
|
|
|
|
# Try to attach the phone.
|
|
for i in range(self.attach_retries):
|
|
|
|
try:
|
|
|
|
# Turn off airplane mode
|
|
self.dut.toggle_airplane_mode(False)
|
|
|
|
# Wait for the phone to attach.
|
|
self.simulator.wait_until_attached(timeout=self.attach_timeout)
|
|
|
|
except cellular_simulator.CellularSimulatorError:
|
|
|
|
# The phone failed to attach
|
|
self.log.info(
|
|
"UE failed to attach on attempt number {}.".format(i +
|
|
1))
|
|
|
|
# Turn airplane mode on to prepare the phone for a retry.
|
|
self.dut.toggle_airplane_mode(True)
|
|
|
|
# Wait for APM to propagate
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(3)
|
|
|
|
# Retry
|
|
if i < self.attach_retries - 1:
|
|
continue
|
|
else:
|
|
return False
|
|
|
|
else:
|
|
# The phone attached successfully.
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(self.SETTLING_TIME)
|
|
self.log.info("UE attached to the callbox.")
|
|
break
|
|
|
|
return True
|
|
|
|
def detach(self):
|
|
""" Detach the phone from the basestation.
|
|
|
|
Turns airplane mode and resets basestation.
|
|
"""
|
|
|
|
# Set the DUT to airplane mode so it doesn't see the
|
|
# cellular network going off
|
|
self.dut.toggle_airplane_mode(True)
|
|
|
|
# Wait for APM to propagate
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
# Power off basestation
|
|
self.simulator.detach()
|
|
|
|
def stop(self):
|
|
""" Detach phone from the basestation by stopping the simulation.
|
|
|
|
Stop the simulation and turn airplane mode on. """
|
|
|
|
# Set the DUT to airplane mode so it doesn't see the
|
|
# cellular network going off
|
|
self.dut.toggle_airplane_mode(True)
|
|
|
|
# Wait for APM to propagate
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
# Stop the simulation
|
|
self.simulator.stop()
|
|
|
|
def start(self):
|
|
""" Start the simulation by attaching the phone and setting the
|
|
required DL and UL power.
|
|
|
|
Note that this refers to starting the simulated testing environment
|
|
and not to starting the signaling on the cellular instruments,
|
|
which might have been done earlier depending on the cellular
|
|
instrument controller implementation. """
|
|
|
|
if not self.attach():
|
|
raise RuntimeError('Could not attach to base station.')
|
|
|
|
# Starts IP traffic while changing this setting to force the UE to be
|
|
# in Communication state, as UL power cannot be set in Idle state
|
|
self.start_traffic_for_calibration()
|
|
|
|
self.simulator.wait_until_communication_state()
|
|
|
|
# Set uplink power to a minimum before going to the actual desired
|
|
# value. This avoid inconsistencies produced by the hysteresis in the
|
|
# PA switching points.
|
|
self.log.info('Setting UL power to -30 dBm before going to the '
|
|
'requested value to avoid incosistencies caused by '
|
|
'hysteresis.')
|
|
self.set_uplink_tx_power(-30)
|
|
|
|
# Set signal levels obtained from the test parameters
|
|
self.set_downlink_rx_power(self.sim_dl_power)
|
|
self.set_uplink_tx_power(self.sim_ul_power)
|
|
|
|
# Verify signal level
|
|
try:
|
|
rx_power, tx_power = self.dut.get_rx_tx_power_levels()
|
|
|
|
if not tx_power or not rx_power[0]:
|
|
raise RuntimeError('The method return invalid Tx/Rx values.')
|
|
|
|
self.log.info('Signal level reported by the DUT in dBm: Tx = {}, '
|
|
'Rx = {}.'.format(tx_power, rx_power))
|
|
|
|
if abs(self.sim_ul_power - tx_power) > 1:
|
|
self.log.warning('Tx power at the UE is off by more than 1 dB')
|
|
|
|
except RuntimeError as e:
|
|
self.log.error('Could not verify Rx / Tx levels: %s.' % e)
|
|
|
|
# Stop IP traffic after setting the UL power level
|
|
self.stop_traffic_for_calibration()
|
|
|
|
def parse_parameters(self, parameters):
|
|
""" Configures simulation using a list of parameters.
|
|
|
|
Consumes parameters from a list.
|
|
Children classes need to call this method first.
|
|
|
|
Args:
|
|
parameters: list of parameters
|
|
"""
|
|
|
|
raise NotImplementedError()
|
|
|
|
def consume_parameter(self, parameters, parameter_name, num_values=0):
|
|
""" Parses a parameter from a list.
|
|
|
|
Allows to parse the parameter list. Will delete parameters from the
|
|
list after consuming them to ensure that they are not used twice.
|
|
|
|
Args:
|
|
parameters: list of parameters
|
|
parameter_name: keyword to look up in the list
|
|
num_values: number of arguments following the
|
|
parameter name in the list
|
|
Returns:
|
|
A list containing the parameter name and the following num_values
|
|
arguments
|
|
"""
|
|
|
|
try:
|
|
i = parameters.index(parameter_name)
|
|
except ValueError:
|
|
# parameter_name is not set
|
|
return []
|
|
|
|
return_list = []
|
|
|
|
try:
|
|
for j in range(num_values + 1):
|
|
return_list.append(parameters.pop(i))
|
|
except IndexError:
|
|
raise ValueError(
|
|
"Parameter {} has to be followed by {} values.".format(
|
|
parameter_name, num_values))
|
|
|
|
return return_list
|
|
|
|
def set_uplink_tx_power(self, signal_level):
|
|
""" Configure the uplink tx power level
|
|
|
|
Args:
|
|
signal_level: calibrated tx power in dBm
|
|
"""
|
|
new_config = self.BtsConfig()
|
|
new_config.input_power = self.calibrated_uplink_tx_power(
|
|
self.primary_config, signal_level)
|
|
self.simulator.configure_bts(new_config)
|
|
self.primary_config.incorporate(new_config)
|
|
|
|
def set_downlink_rx_power(self, signal_level):
|
|
""" Configure the downlink rx power level
|
|
|
|
Args:
|
|
signal_level: calibrated rx power in dBm
|
|
"""
|
|
new_config = self.BtsConfig()
|
|
new_config.output_power = self.calibrated_downlink_rx_power(
|
|
self.primary_config, signal_level)
|
|
self.simulator.configure_bts(new_config)
|
|
self.primary_config.incorporate(new_config)
|
|
|
|
def get_uplink_power_from_parameters(self, parameters):
|
|
""" Reads uplink power from a list of parameters. """
|
|
|
|
values = self.consume_parameter(parameters, self.PARAM_UL_PW, 1)
|
|
|
|
if values:
|
|
if values[1] in self.UPLINK_SIGNAL_LEVEL_DICTIONARY:
|
|
return self.UPLINK_SIGNAL_LEVEL_DICTIONARY[values[1]]
|
|
else:
|
|
try:
|
|
if values[1][0] == 'n':
|
|
# Treat the 'n' character as a negative sign
|
|
return -int(values[1][1:])
|
|
else:
|
|
return int(values[1])
|
|
except ValueError:
|
|
pass
|
|
|
|
# If the method got to this point it is because PARAM_UL_PW was not
|
|
# included in the test parameters or the provided value was invalid.
|
|
raise ValueError(
|
|
"The test name needs to include parameter {} followed by the "
|
|
"desired uplink power expressed by an integer number in dBm "
|
|
"or by one the following values: {}. To indicate negative "
|
|
"values, use the letter n instead of - sign.".format(
|
|
self.PARAM_UL_PW,
|
|
list(self.UPLINK_SIGNAL_LEVEL_DICTIONARY.keys())))
|
|
|
|
def get_downlink_power_from_parameters(self, parameters):
|
|
""" Reads downlink power from a list of parameters. """
|
|
|
|
values = self.consume_parameter(parameters, self.PARAM_DL_PW, 1)
|
|
|
|
if values:
|
|
if values[1] not in self.DOWNLINK_SIGNAL_LEVEL_DICTIONARY:
|
|
raise ValueError("Invalid signal level value {}.".format(
|
|
values[1]))
|
|
else:
|
|
return self.DOWNLINK_SIGNAL_LEVEL_DICTIONARY[values[1]]
|
|
else:
|
|
# Use default value
|
|
power = self.DOWNLINK_SIGNAL_LEVEL_DICTIONARY['excellent']
|
|
self.log.info("No DL signal level value was indicated in the test "
|
|
"parameters. Using default value of {} {}.".format(
|
|
power, self.DOWNLINK_SIGNAL_LEVEL_UNITS))
|
|
return power
|
|
|
|
def calibrated_downlink_rx_power(self, bts_config, signal_level):
|
|
""" Calculates the power level at the instrument's output in order to
|
|
obtain the required rx power level at the DUT's input.
|
|
|
|
If calibration values are not available, returns the uncalibrated signal
|
|
level.
|
|
|
|
Args:
|
|
bts_config: the current configuration at the base station. derived
|
|
classes implementations can use this object to indicate power as
|
|
spectral power density or in other units.
|
|
signal_level: desired downlink received power, can be either a
|
|
key value pair, an int or a float
|
|
"""
|
|
|
|
# Obtain power value if the provided signal_level is a key value pair
|
|
if isinstance(signal_level, Enum):
|
|
power = signal_level.value
|
|
else:
|
|
power = signal_level
|
|
|
|
# Try to use measured path loss value. If this was not set, it will
|
|
# throw an TypeError exception
|
|
try:
|
|
calibrated_power = round(power + self.dl_path_loss)
|
|
if calibrated_power > self.simulator.MAX_DL_POWER:
|
|
self.log.warning(
|
|
"Cannot achieve phone DL Rx power of {} dBm. Requested TX "
|
|
"power of {} dBm exceeds callbox limit!".format(
|
|
power, calibrated_power))
|
|
calibrated_power = self.simulator.MAX_DL_POWER
|
|
self.log.warning(
|
|
"Setting callbox Tx power to max possible ({} dBm)".
|
|
format(calibrated_power))
|
|
|
|
self.log.info(
|
|
"Requested phone DL Rx power of {} dBm, setting callbox Tx "
|
|
"power at {} dBm".format(power, calibrated_power))
|
|
# Power has to be a natural number so calibration wont be exact.
|
|
# Inform the actual received power after rounding.
|
|
self.log.info(
|
|
"Phone downlink received power is {0:.2f} dBm".format(
|
|
calibrated_power - self.dl_path_loss))
|
|
return calibrated_power
|
|
except TypeError:
|
|
self.log.info("Phone downlink received power set to {} (link is "
|
|
"uncalibrated).".format(round(power)))
|
|
return round(power)
|
|
|
|
def calibrated_uplink_tx_power(self, bts_config, signal_level):
|
|
""" Calculates the power level at the instrument's input in order to
|
|
obtain the required tx power level at the DUT's output.
|
|
|
|
If calibration values are not available, returns the uncalibrated signal
|
|
level.
|
|
|
|
Args:
|
|
bts_config: the current configuration at the base station. derived
|
|
classes implementations can use this object to indicate power as
|
|
spectral power density or in other units.
|
|
signal_level: desired uplink transmitted power, can be either a
|
|
key value pair, an int or a float
|
|
"""
|
|
|
|
# Obtain power value if the provided signal_level is a key value pair
|
|
if isinstance(signal_level, Enum):
|
|
power = signal_level.value
|
|
else:
|
|
power = signal_level
|
|
|
|
# Try to use measured path loss value. If this was not set, it will
|
|
# throw an TypeError exception
|
|
try:
|
|
calibrated_power = round(power - self.ul_path_loss)
|
|
if calibrated_power < self.UL_MIN_POWER:
|
|
self.log.warning(
|
|
"Cannot achieve phone UL Tx power of {} dBm. Requested UL "
|
|
"power of {} dBm exceeds callbox limit!".format(
|
|
power, calibrated_power))
|
|
calibrated_power = self.UL_MIN_POWER
|
|
self.log.warning(
|
|
"Setting UL Tx power to min possible ({} dBm)".format(
|
|
calibrated_power))
|
|
|
|
self.log.info(
|
|
"Requested phone UL Tx power of {} dBm, setting callbox Rx "
|
|
"power at {} dBm".format(power, calibrated_power))
|
|
# Power has to be a natural number so calibration wont be exact.
|
|
# Inform the actual transmitted power after rounding.
|
|
self.log.info(
|
|
"Phone uplink transmitted power is {0:.2f} dBm".format(
|
|
calibrated_power + self.ul_path_loss))
|
|
return calibrated_power
|
|
except TypeError:
|
|
self.log.info("Phone uplink transmitted power set to {} (link is "
|
|
"uncalibrated).".format(round(power)))
|
|
return round(power)
|
|
|
|
def calibrate(self, band):
|
|
""" Calculates UL and DL path loss if it wasn't done before.
|
|
|
|
The should be already set to the required band before calling this
|
|
method.
|
|
|
|
Args:
|
|
band: the band that is currently being calibrated.
|
|
"""
|
|
|
|
if self.dl_path_loss and self.ul_path_loss:
|
|
self.log.info("Measurements are already calibrated.")
|
|
|
|
# Attach the phone to the base station
|
|
if not self.attach():
|
|
self.log.info(
|
|
"Skipping calibration because the phone failed to attach.")
|
|
return
|
|
|
|
# If downlink or uplink were not yet calibrated, do it now
|
|
if not self.dl_path_loss:
|
|
self.dl_path_loss = self.downlink_calibration()
|
|
if not self.ul_path_loss:
|
|
self.ul_path_loss = self.uplink_calibration()
|
|
|
|
# Detach after calibrating
|
|
self.detach()
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
def start_traffic_for_calibration(self):
|
|
"""
|
|
Starts UDP IP traffic before running calibration. Uses APN_1
|
|
configured in the phone.
|
|
"""
|
|
self.simulator.start_data_traffic()
|
|
|
|
def stop_traffic_for_calibration(self):
|
|
"""
|
|
Stops IP traffic after calibration.
|
|
"""
|
|
self.simulator.stop_data_traffic()
|
|
|
|
def downlink_calibration(self, rat=None, power_units_conversion_func=None):
|
|
""" Computes downlink path loss and returns the calibration value
|
|
|
|
The DUT needs to be attached to the base station before calling this
|
|
method.
|
|
|
|
Args:
|
|
rat: desired RAT to calibrate (matching the label reported by
|
|
the phone)
|
|
power_units_conversion_func: a function to convert the units
|
|
reported by the phone to dBm. needs to take two arguments: the
|
|
reported signal level and bts. use None if no conversion is
|
|
needed.
|
|
Returns:
|
|
Downlink calibration value and measured DL power.
|
|
"""
|
|
|
|
# Check if this parameter was set. Child classes may need to override
|
|
# this class passing the necessary parameters.
|
|
if not rat:
|
|
raise ValueError(
|
|
"The parameter 'rat' has to indicate the RAT being used as "
|
|
"reported by the phone.")
|
|
|
|
# Save initial output level to restore it after calibration
|
|
restoration_config = self.BtsConfig()
|
|
restoration_config.output_power = self.primary_config.output_power
|
|
|
|
# Set BTS to a good output level to minimize measurement error
|
|
new_config = self.BtsConfig()
|
|
new_config.output_power = self.simulator.MAX_DL_POWER - 5
|
|
self.simulator.configure_bts(new_config)
|
|
|
|
# Starting IP traffic
|
|
self.start_traffic_for_calibration()
|
|
|
|
down_power_measured = []
|
|
for i in range(0, self.NUM_DL_CAL_READS):
|
|
# For some reason, the RSRP gets updated on Screen ON event
|
|
signal_strength = self.dut.get_telephony_signal_strength()
|
|
down_power_measured.append(signal_strength[rat])
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(5)
|
|
|
|
# Stop IP traffic
|
|
self.stop_traffic_for_calibration()
|
|
|
|
# Reset bts to original settings
|
|
self.simulator.configure_bts(restoration_config)
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
# Calculate the mean of the measurements
|
|
reported_asu_power = np.nanmean(down_power_measured)
|
|
|
|
# Convert from RSRP to signal power
|
|
if power_units_conversion_func:
|
|
avg_down_power = power_units_conversion_func(
|
|
reported_asu_power, self.primary_config)
|
|
else:
|
|
avg_down_power = reported_asu_power
|
|
|
|
# Calculate Path Loss
|
|
dl_target_power = self.simulator.MAX_DL_POWER - 5
|
|
down_call_path_loss = dl_target_power - avg_down_power
|
|
|
|
# Validate the result
|
|
if not 0 < down_call_path_loss < 100:
|
|
raise RuntimeError(
|
|
"Downlink calibration failed. The calculated path loss value "
|
|
"was {} dBm.".format(down_call_path_loss))
|
|
|
|
self.log.info("Measured downlink path loss: {} dB".format(
|
|
down_call_path_loss))
|
|
|
|
return down_call_path_loss
|
|
|
|
def uplink_calibration(self):
|
|
""" Computes uplink path loss and returns the calibration value
|
|
|
|
The DUT needs to be attached to the base station before calling this
|
|
method.
|
|
|
|
Returns:
|
|
Uplink calibration value and measured UL power
|
|
"""
|
|
|
|
# Save initial input level to restore it after calibration
|
|
restoration_config = self.BtsConfig()
|
|
restoration_config.input_power = self.primary_config.input_power
|
|
|
|
# Set BTS1 to maximum input allowed in order to perform
|
|
# uplink calibration
|
|
target_power = self.MAX_PHONE_OUTPUT_POWER
|
|
new_config = self.BtsConfig()
|
|
new_config.input_power = self.MAX_BTS_INPUT_POWER
|
|
self.simulator.configure_bts(new_config)
|
|
|
|
# Start IP traffic
|
|
self.start_traffic_for_calibration()
|
|
|
|
up_power_per_chain = []
|
|
# Get the number of chains
|
|
cmd = 'MONITOR? UL_PUSCH'
|
|
uplink_meas_power = self.anritsu.send_query(cmd)
|
|
str_power_chain = uplink_meas_power.split(',')
|
|
num_chains = len(str_power_chain)
|
|
for ichain in range(0, num_chains):
|
|
up_power_per_chain.append([])
|
|
|
|
for i in range(0, self.NUM_UL_CAL_READS):
|
|
uplink_meas_power = self.anritsu.send_query(cmd)
|
|
str_power_chain = uplink_meas_power.split(',')
|
|
|
|
for ichain in range(0, num_chains):
|
|
if (str_power_chain[ichain] == 'DEACTIVE'):
|
|
up_power_per_chain[ichain].append(float('nan'))
|
|
else:
|
|
up_power_per_chain[ichain].append(
|
|
float(str_power_chain[ichain]))
|
|
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(3)
|
|
|
|
# Stop IP traffic
|
|
self.stop_traffic_for_calibration()
|
|
|
|
# Reset bts to original settings
|
|
self.simulator.configure_bts(restoration_config)
|
|
# TODO @latware b/186880504 change this to a poll_for_condition (Q3 21)
|
|
time.sleep(2)
|
|
|
|
# Phone only supports 1x1 Uplink so always chain 0
|
|
avg_up_power = np.nanmean(up_power_per_chain[0])
|
|
if np.isnan(avg_up_power):
|
|
raise RuntimeError(
|
|
"Calibration failed because the callbox reported the chain to "
|
|
"be deactive.")
|
|
|
|
up_call_path_loss = target_power - avg_up_power
|
|
|
|
# Validate the result
|
|
if not 0 < up_call_path_loss < 100:
|
|
raise RuntimeError(
|
|
"Uplink calibration failed. The calculated path loss value "
|
|
"was {} dBm.".format(up_call_path_loss))
|
|
|
|
self.log.info(
|
|
"Measured uplink path loss: {} dB".format(up_call_path_loss))
|
|
|
|
return up_call_path_loss
|
|
|
|
def load_pathloss_if_required(self):
|
|
""" If calibration is required, try to obtain the pathloss values from
|
|
the calibration table and measure them if they are not available. """
|
|
# Invalidate the previous values
|
|
self.dl_path_loss = None
|
|
self.ul_path_loss = None
|
|
|
|
# Load the new ones
|
|
if self.calibration_required:
|
|
band = self.primary_config.band
|
|
|
|
# Try loading the path loss values from the calibration table. If
|
|
# they are not available, use the automated calibration procedure.
|
|
try:
|
|
self.dl_path_loss = self.calibration_table[band]["dl"]
|
|
self.ul_path_loss = self.calibration_table[band]["ul"]
|
|
except KeyError:
|
|
self.calibrate(band)
|
|
|
|
# Complete the calibration table with the new values to be used in
|
|
# the next tests.
|
|
if band not in self.calibration_table:
|
|
self.calibration_table[band] = {}
|
|
|
|
if "dl" not in self.calibration_table[band] and self.dl_path_loss:
|
|
self.calibration_table[band]["dl"] = self.dl_path_loss
|
|
|
|
if "ul" not in self.calibration_table[band] and self.ul_path_loss:
|
|
self.calibration_table[band]["ul"] = self.ul_path_loss
|
|
|
|
def maximum_downlink_throughput(self):
|
|
""" Calculates maximum achievable downlink throughput in the current
|
|
simulation state.
|
|
|
|
Because thoughput is dependent on the RAT, this method needs to be
|
|
implemented by children classes.
|
|
|
|
Returns:
|
|
Maximum throughput in mbps
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def maximum_uplink_throughput(self):
|
|
""" Calculates maximum achievable downlink throughput in the current
|
|
simulation state.
|
|
|
|
Because thoughput is dependent on the RAT, this method needs to be
|
|
implemented by children classes.
|
|
|
|
Returns:
|
|
Maximum throughput in mbps
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def send_sms(self, sms_message):
|
|
""" Sends the set SMS message. """
|
|
raise NotImplementedError()
|