#!/usr/bin/env python3 # Copyright 2022 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Unit test for proxy.py""" import abc import asyncio from struct import pack import time from typing import List import unittest from pigweed.pw_rpc.internal import packet_pb2 from pigweed.pw_transfer import transfer_pb2 from pw_hdlc import encode from pw_transfer import ProtocolVersion from pw_transfer.chunk import Chunk import proxy class MockRng(abc.ABC): def __init__(self, results: List[float]): self._results = results def uniform(self, from_val: float, to_val: float) -> float: val_range = to_val - from_val val = self._results.pop() val *= val_range val += from_val return val class ProxyTest(unittest.IsolatedAsyncioTestCase): async def test_transposer_simple(self): sent_packets: List[bytes] = [] # Async helper so DataTransposer can await on it. async def append(list: List[bytes], data: bytes): list.append(data) transposer = proxy.DataTransposer( lambda data: append(sent_packets, data), name="test", rate=0.5, timeout=100, seed=1234567890, ) transposer._rng = MockRng([0.6, 0.4]) await transposer.process(b'aaaaaaaaaa') await transposer.process(b'bbbbbbbbbb') # Give the transposer task time to process the data. await asyncio.sleep(0.05) self.assertEqual(sent_packets, [b'bbbbbbbbbb', b'aaaaaaaaaa']) async def test_transposer_timeout(self): sent_packets: List[bytes] = [] # Async helper so DataTransposer can await on it. async def append(list: List[bytes], data: bytes): list.append(data) transposer = proxy.DataTransposer( lambda data: append(sent_packets, data), name="test", rate=0.5, timeout=0.100, seed=1234567890, ) transposer._rng = MockRng([0.4, 0.6]) await transposer.process(b'aaaaaaaaaa') # Even though this should be transposed, there is no following data so # the transposer should timout and send this in-order. await transposer.process(b'bbbbbbbbbb') # Give the transposer time to timeout. await asyncio.sleep(0.5) self.assertEqual(sent_packets, [b'aaaaaaaaaa', b'bbbbbbbbbb']) async def test_server_failure(self): sent_packets: List[bytes] = [] # Async helper so DataTransposer can await on it. async def append(list: List[bytes], data: bytes): list.append(data) packets_before_failure = [1, 2, 3] server_failure = proxy.ServerFailure( lambda data: append(sent_packets, data), name="test", packets_before_failure_list=packets_before_failure.copy(), ) # After passing the list to ServerFailure, add a test for no # packets dropped packets_before_failure.append(5) packets = [ b'1', b'2', b'3', b'4', b'5', ] for num_packets in packets_before_failure: sent_packets.clear() for packet in packets: await server_failure.process(packet) self.assertEqual(len(sent_packets), num_packets) server_failure.handle_event(proxy.Event.TRANSFER_START) async def test_keep_drop_queue_loop(self): sent_packets: List[bytes] = [] # Async helper so DataTransposer can await on it. async def append(list: List[bytes], data: bytes): list.append(data) keep_drop_queue = proxy.KeepDropQueue( lambda data: append(sent_packets, data), name="test", keep_drop_queue=[2, 1, 3], ) expected_sequence = [ b'1', b'2', b'4', b'5', b'6', b'9', ] input_packets = [ b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9', ] for packet in input_packets: await keep_drop_queue.process(packet) self.assertEqual(sent_packets, expected_sequence) async def test_keep_drop_queue(self): sent_packets: List[bytes] = [] # Async helper so DataTransposer can await on it. async def append(list: List[bytes], data: bytes): list.append(data) keep_drop_queue = proxy.KeepDropQueue( lambda data: append(sent_packets, data), name="test", keep_drop_queue=[2, 1, 1, -1], ) expected_sequence = [ b'1', b'2', b'4', ] input_packets = [ b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9', ] for packet in input_packets: await keep_drop_queue.process(packet) self.assertEqual(sent_packets, expected_sequence) async def test_window_packet_dropper(self): sent_packets: List[bytes] = [] # Async helper so DataTransposer can await on it. async def append(list: List[bytes], data: bytes): list.append(data) window_packet_dropper = proxy.WindowPacketDropper( lambda data: append(sent_packets, data), name="test", window_packet_to_drop=0, ) packets = [ _encode_rpc_frame( Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'1') ), _encode_rpc_frame( Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'2') ), _encode_rpc_frame( Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'3') ), _encode_rpc_frame( Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'4') ), _encode_rpc_frame( Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'5') ), ] expected_packets = packets[1:] # Test each even twice to assure the filter does not have issues # on new window bondaries. events = [ proxy.Event.PARAMETERS_RETRANSMIT, proxy.Event.PARAMETERS_CONTINUE, proxy.Event.PARAMETERS_RETRANSMIT, proxy.Event.PARAMETERS_CONTINUE, ] for event in events: sent_packets.clear() for packet in packets: await window_packet_dropper.process(packet) self.assertEqual(sent_packets, expected_packets) window_packet_dropper.handle_event(event) def _encode_rpc_frame(chunk: Chunk) -> bytes: packet = packet_pb2.RpcPacket( type=packet_pb2.PacketType.SERVER_STREAM, channel_id=101, service_id=1001, method_id=100001, payload=chunk.to_message().SerializeToString(), ).SerializeToString() return encode.ui_frame(73, packet) if __name__ == '__main__': unittest.main()