2354 lines
82 KiB
Python
2354 lines
82 KiB
Python
# Copyright 2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# File generated from uci_packets.json, with the command:
|
|
# ./scripts/generate_python_backend.py --input uci_packets.json
|
|
# /!\ Do not edit by hand.
|
|
from dataclasses import dataclass, field, fields
|
|
from typing import Optional, List, Tuple
|
|
import enum
|
|
import inspect
|
|
import math
|
|
|
|
|
|
@dataclass
|
|
class Packet:
|
|
payload: Optional[bytes] = field(repr=False)
|
|
|
|
@classmethod
|
|
def parse_all(cls, span: bytes) -> 'Packet':
|
|
packet, remain = getattr(cls, 'parse')(span)
|
|
if len(remain) > 0:
|
|
raise Exception('Unexpected parsing remainder')
|
|
return packet
|
|
|
|
def show(self, prefix: str = ''):
|
|
print(f'{self.__class__.__name__}')
|
|
|
|
def print_val(p: str, pp: str, name: str, align: int, typ, val):
|
|
if name == 'payload':
|
|
pass
|
|
|
|
# Scalar fields.
|
|
elif typ is int:
|
|
print(f'{p}{name:{align}} = {val} (0x{val:x})')
|
|
|
|
# Byte fields.
|
|
elif typ is bytes:
|
|
print(f'{p}{name:{align}} = [', end='')
|
|
line = ''
|
|
n_pp = ''
|
|
for (idx, b) in enumerate(val):
|
|
if idx > 0 and idx % 8 == 0:
|
|
print(f'{n_pp}{line}')
|
|
line = ''
|
|
n_pp = pp + (' ' * (align + 4))
|
|
line += f' {b:02x}'
|
|
print(f'{n_pp}{line} ]')
|
|
|
|
# Enum fields.
|
|
elif inspect.isclass(typ) and issubclass(typ, enum.IntEnum):
|
|
print(f'{p}{name:{align}} = {typ.__name__}::{val.name} (0x{val:x})')
|
|
|
|
# Struct fields.
|
|
elif inspect.isclass(typ) and issubclass(typ, globals().get('Packet')):
|
|
print(f'{p}{name:{align}} = ', end='')
|
|
val.show(prefix=pp)
|
|
|
|
# Array fields.
|
|
elif typ.__origin__ == list:
|
|
print(f'{p}{name:{align}}')
|
|
last = len(val) - 1
|
|
align = 5
|
|
for (idx, elt) in enumerate(val):
|
|
n_p = pp + ('├── ' if idx != last else '└── ')
|
|
n_pp = pp + ('│ ' if idx != last else ' ')
|
|
print_val(n_p, n_pp, f'[{idx}]',
|
|
align, typ.__args__[0], val[idx])
|
|
|
|
else:
|
|
print(f'{p}{name:{align}} = ##{typ}##')
|
|
|
|
last = len(fields(self)) - 1
|
|
align = max(len(f.name) for f in fields(self) if f.name != 'payload')
|
|
|
|
for (idx, f) in enumerate(fields(self)):
|
|
p = prefix + ('├── ' if idx != last else '└── ')
|
|
pp = prefix + ('│ ' if idx != last else ' ')
|
|
val = getattr(self, f.name)
|
|
|
|
print_val(p, pp, f.name, align, f.type, val)
|
|
|
|
|
|
class PacketBoundaryFlag(enum.IntEnum):
|
|
COMPLETE = 0x0
|
|
NOT_COMPLETE = 0x1
|
|
|
|
|
|
class GroupId(enum.IntEnum):
|
|
CORE = 0x0
|
|
SESSION_CONFIG = 0x1
|
|
RANGING_SESSION_CONTROL = 0x2
|
|
DATA_CONTROL = 0x3
|
|
TEST = 0xd
|
|
VENDOR_PICA = 0x9
|
|
VENDOR_RESERVED_A = 0xa
|
|
VENDOR_RESERVED_B = 0xb
|
|
VENDOR_ANDROID = 0xe
|
|
VENDOR_RESERVED_E = 0xc
|
|
VENDOR_RESERVED_F = 0xf
|
|
|
|
|
|
class CoreOpCode(enum.IntEnum):
|
|
CORE_DEVICE_RESET = 0x0
|
|
CORE_DEVICE_STATUS_NTF = 0x1
|
|
CORE_GET_DEVICE_INFO = 0x2
|
|
CORE_GET_CAPS_INFO = 0x3
|
|
CORE_SET_CONFIG = 0x4
|
|
CORE_GET_CONFIG = 0x5
|
|
CORE_DEVICE_SUSPEND = 0x6
|
|
CORE_GENERIC_ERROR_NTF = 0x7
|
|
|
|
|
|
class SessionOpCode(enum.IntEnum):
|
|
SESSION_INIT = 0x0
|
|
SESSION_DEINIT = 0x1
|
|
SESSION_STATUS_NTF = 0x2
|
|
SESSION_SET_APP_CONFIG = 0x3
|
|
SESSION_GET_APP_CONFIG = 0x4
|
|
SESSION_GET_COUNT = 0x5
|
|
SESSION_GET_STATE = 0x6
|
|
SESSION_UPDATE_CONTROLLER_MULTICAST_LIST = 0x7
|
|
|
|
|
|
class RangeOpCode(enum.IntEnum):
|
|
RANGE_START = 0x0
|
|
RANGE_STOP = 0x1
|
|
RANGE_INTERVAL_UPDATE_REQ = 0x2
|
|
RANGE_GET_RANGING_COUNT = 0x3
|
|
|
|
|
|
class AppDataOpCode(enum.IntEnum):
|
|
APP_DATA_TX = 0x0
|
|
APP_DATA_RX = 0x1
|
|
|
|
|
|
class PicaOpCode(enum.IntEnum):
|
|
PICA_INIT_DEVICE = 0x0
|
|
PICA_SET_DEVICE_POSITION = 0x1
|
|
PICA_CREATE_BEACON = 0x2
|
|
PICA_SET_BEACON_POSITION = 0x3
|
|
PICA_DESTROY_BEACON = 0x4
|
|
|
|
|
|
class AndroidOpCode(enum.IntEnum):
|
|
ANDROID_GET_POWER_STATS = 0x0
|
|
ANDROID_SET_COUNTRY_CODE = 0x1
|
|
|
|
|
|
class StatusCode(enum.IntEnum):
|
|
UCI_STATUS_OK = 0x0
|
|
UCI_STATUS_REJECTED = 0x1
|
|
UCI_STATUS_FAILED = 0x2
|
|
UCI_STATUS_SYNTAX_ERROR = 0x3
|
|
UCI_STATUS_INVALID_PARAM = 0x4
|
|
UCI_STATUS_INVALID_RANGE = 0x5
|
|
UCI_STATUS_INVALID_MSG_SIZE = 0x6
|
|
UCI_STATUS_UNKNOWN_GID = 0x7
|
|
UCI_STATUS_UNKNOWN_OID = 0x8
|
|
UCI_STATUS_READ_ONLY = 0x9
|
|
UCI_STATUS_COMMAND_RETRY = 0xa
|
|
UCI_STATUS_SESSION_NOT_EXIST = 0x11
|
|
UCI_STATUS_SESSION_DUPLICATE = 0x12
|
|
UCI_STATUS_SESSION_ACTIVE = 0x13
|
|
UCI_STATUS_MAX_SESSIONS_EXCEEDED = 0x14
|
|
UCI_STATUS_SESSION_NOT_CONFIGURED = 0x15
|
|
UCI_STATUS_ACTIVE_SESSION_ONGOING = 0x16
|
|
UCI_STATUS_MULTICAST_LIST_FULL = 0x17
|
|
UCI_STATUS_ADDRESS_NOT_FOUND = 0x18
|
|
UCI_STATUS_ADDRESS_ALREADY_PRESENT = 0x19
|
|
UCI_STATUS_RANGING_TX_FAILED = 0x20
|
|
UCI_STATUS_RANGING_RX_TIMEOUT = 0x21
|
|
UCI_STATUS_RANGING_RX_PHY_DEC_FAILED = 0x22
|
|
UCI_STATUS_RANGING_RX_PHY_TOA_FAILED = 0x23
|
|
UCI_STATUS_RANGING_RX_PHY_STS_FAILED = 0x24
|
|
UCI_STATUS_RANGING_RX_MAC_DEC_FAILED = 0x25
|
|
UCI_STATUS_RANGING_RX_MAC_IE_DEC_FAILED = 0x26
|
|
UCI_STATUS_RANGING_RX_MAC_IE_MISSING = 0x27
|
|
UCI_STATUS_DATA_MAX_TX_PSDU_SIZE_EXCEEDED = 0x30
|
|
UCI_STATUS_DATA_RX_CRC_ERROR = 0x31
|
|
UCI_STATUS_ERROR_CCC_SE_BUSY = 0x50
|
|
UCI_STATUS_ERROR_CCC_LIFECYCLE = 0x51
|
|
|
|
|
|
class ResetConfig(enum.IntEnum):
|
|
UWBS_RESET = 0x0
|
|
|
|
|
|
class DeviceConfigId(enum.IntEnum):
|
|
DEVICE_STATE = 0x0
|
|
LOW_POWER_MODE = 0x1
|
|
|
|
|
|
class AppConfigTlvType(enum.IntEnum):
|
|
DEVICE_TYPE = 0x0
|
|
RANGING_ROUND_USAGE = 0x1
|
|
STS_CONFIG = 0x2
|
|
MULTI_NODE_MODE = 0x3
|
|
CHANNEL_NUMBER = 0x4
|
|
NO_OF_CONTROLEE = 0x5
|
|
DEVICE_MAC_ADDRESS = 0x6
|
|
DST_MAC_ADDRESS = 0x7
|
|
SLOT_DURATION = 0x8
|
|
RANGING_INTERVAL = 0x9
|
|
STS_INDEX = 0xa
|
|
MAC_FCS_TYPE = 0xb
|
|
RANGING_ROUND_CONTROL = 0xc
|
|
AOA_RESULT_REQ = 0xd
|
|
RNG_DATA_NTF = 0xe
|
|
RNG_DATA_NTF_PROXIMITY_NEAR = 0xf
|
|
RNG_DATA_NTF_PROXIMITY_FAR = 0x10
|
|
DEVICE_ROLE = 0x11
|
|
RFRAME_CONFIG = 0x12
|
|
PREAMBLE_CODE_INDEX = 0x14
|
|
SFD_ID = 0x15
|
|
PSDU_DATA_RATE = 0x16
|
|
PREAMBLE_DURATION = 0x17
|
|
RANGING_TIME_STRUCT = 0x1a
|
|
SLOTS_PER_RR = 0x1b
|
|
TX_ADAPTIVE_PAYLOAD_POWER = 0x1c
|
|
RESPONDER_SLOT_INDEX = 0x1e
|
|
PRF_MODE = 0x1f
|
|
SCHEDULED_MODE = 0x22
|
|
KEY_ROTATION = 0x23
|
|
KEY_ROTATION_RATE = 0x24
|
|
SESSION_PRIORITY = 0x25
|
|
MAC_ADDRESS_MODE = 0x26
|
|
VENDOR_ID = 0x27
|
|
STATIC_STS_IV = 0x28
|
|
NUMBER_OF_STS_SEGMENTS = 0x29
|
|
MAX_RR_RETRY = 0x2a
|
|
UWB_INITIATION_TIME = 0x2b
|
|
HOPPING_MODE = 0x2c
|
|
BLOCK_STRIDE_LENGTH = 0x2d
|
|
RESULT_REPORT_CONFIG = 0x2e
|
|
IN_BAND_TERMINATION_ATTEMPT_COUNT = 0x2f
|
|
SUB_SESSION_ID = 0x30
|
|
BPRF_PHR_DATA_RATE = 0x31
|
|
MAX_NUMBER_OF_MEASUREMENTS = 0x32
|
|
STS_LENGTH = 0x35
|
|
CCC_HOP_MODE_KEY = 0xa0
|
|
CCC_UWB_TIME0 = 0xa1
|
|
CCC_RANGING_PROTOCOL_VER = 0xa3
|
|
CCC_UWB_CONFIG_ID = 0xa4
|
|
CCC_PULSESHAPE_COMBO = 0xa5
|
|
CCC_URSK_TTL = 0xa6
|
|
NB_OF_RANGE_MEASUREMENTS = 0xe3
|
|
NB_OF_AZIMUTH_MEASUREMENTS = 0xe4
|
|
NB_OF_ELEVATION_MEASUREMENTS = 0xe5
|
|
|
|
|
|
class CapTlvType(enum.IntEnum):
|
|
SUPPORTED_FIRA_PHY_VERSION_RANGE = 0x0
|
|
SUPPORTED_FIRA_MAC_VERSION_RANGE = 0x1
|
|
SUPPORTED_DEVICE_ROLES = 0x2
|
|
SUPPORTED_RANGING_METHOD = 0x3
|
|
SUPPORTED_STS_CONFIG = 0x4
|
|
SUPPORTED_MULTI_NODE_MODES = 0x5
|
|
SUPPORTED_RANGING_TIME_STRUCT = 0x6
|
|
SUPPORTED_SCHEDULED_MODE = 0x7
|
|
SUPPORTED_HOPPING_MODE = 0x8
|
|
SUPPORTED_BLOCK_STRIDING = 0x9
|
|
SUPPORTED_UWB_INITIATION_TIME = 0xa
|
|
SUPPORTED_CHANNELS = 0xb
|
|
SUPPORTED_RFRAME_CONFIG = 0xc
|
|
SUPPORTED_CC_CONSTRAINT_LENGTH = 0xd
|
|
SUPPORTED_BPRF_PARAMETER_SETS = 0xe
|
|
SUPPORTED_HPRF_PARAMETER_SETS = 0xf
|
|
SUPPORTED_AOA = 0x10
|
|
SUPPORTED_EXTENDED_MAC_ADDRESS = 0x11
|
|
SUPPORTED_AOA_RESULT_REQ_ANTENNA_INTERLEAVING = 0xe3
|
|
SUPPORTED_POWER_STATS = 0xc0
|
|
CCC_SUPPORTED_CHAPS_PER_SLOT = 0xa0
|
|
CCC_SUPPORTED_SYNC_CODES = 0xa1
|
|
CCC_SUPPORTED_HOPPING_CONFIG_MODES_AND_SEQUENCES = 0xa2
|
|
CCC_SUPPORTED_CHANNELS = 0xa3
|
|
CCC_SUPPORTED_VERSIONS = 0xa4
|
|
CCC_SUPPORTED_UWB_CONFIGS = 0xa5
|
|
CCC_SUPPORTED_PULSE_SHAPE_COMBOS = 0xa6
|
|
CCC_SUPPORTED_RAN_MULTIPLIER = 0xa7
|
|
|
|
|
|
class AoaResultReqType(enum.IntEnum):
|
|
AOA_DISABLE = 0x0
|
|
AOA_ENABLE = 0x1
|
|
AOA_ENABLE_AZIMUTH = 0x2
|
|
AOA_ENABLE_ELEVATION = 0x3
|
|
AOA_ENABLE_INTERLEAVED = 0xf0
|
|
|
|
|
|
class DeviceState(enum.IntEnum):
|
|
DEVICE_STATE_READY = 0x1
|
|
DEVICE_STATE_ACTIVE = 0x2
|
|
DEVICE_STATE_ERROR = 0xff
|
|
|
|
|
|
class SessionState(enum.IntEnum):
|
|
SESSION_STATE_INIT = 0x0
|
|
SESSION_STATE_DEINIT = 0x1
|
|
SESSION_STATE_ACTIVE = 0x2
|
|
SESSION_STATE_IDLE = 0x3
|
|
|
|
|
|
class ReasonCode(enum.IntEnum):
|
|
STATE_CHANGE_WITH_SESSION_MANAGEMENT_COMMANDS = 0x0
|
|
MAX_RANGING_ROUND_RETRY_COUNT_REACHED = 0x1
|
|
MAX_NUMBER_OF_MEASUREMENTS_REACHED = 0x2
|
|
ERROR_SLOT_LENGTH_NOT_SUPPORTED = 0x20
|
|
ERROR_INSUFFICIENT_SLOTS_PER_RR = 0x21
|
|
ERROR_MAC_ADDRESS_MODE_NOT_SUPPORTED = 0x22
|
|
ERROR_INVALID_RANGING_INTERVAL = 0x23
|
|
ERROR_INVALID_STS_CONFIG = 0x24
|
|
ERROR_INVALID_RFRAME_CONFIG = 0x25
|
|
|
|
|
|
class MulticastUpdateStatusCode(enum.IntEnum):
|
|
STATUS_OK_MULTICAST_LIST_UPDATE = 0x0
|
|
STATUS_ERROR_MULTICAST_LIST_FULL = 0x1
|
|
STATUS_ERROR_KEY_FETCH_FAIL = 0x2
|
|
STATUS_ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3
|
|
|
|
|
|
class MacAddressIndicator(enum.IntEnum):
|
|
SHORT_ADDRESS = 0x0
|
|
EXTENDED_ADDRESS = 0x1
|
|
|
|
|
|
class SessionType(enum.IntEnum):
|
|
FIRA_RANGING_SESSION = 0x0
|
|
FIRA_DATA_TRANSFER = 0x1
|
|
CCC = 0xa0
|
|
|
|
|
|
class MessageType(enum.IntEnum):
|
|
COMMAND = 0x1
|
|
RESPONSE = 0x2
|
|
NOTIFICATION = 0x3
|
|
|
|
|
|
@dataclass
|
|
class UciPacket(Packet):
|
|
group_id: GroupId
|
|
packet_boundary_flag: PacketBoundaryFlag
|
|
message_type: MessageType
|
|
opcode: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['UciPacket', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 4:
|
|
raise Exception('Invalid packet size')
|
|
fields['group_id'] = GroupId((span[0] >> 0) & 0xf)
|
|
fields['packet_boundary_flag'] = PacketBoundaryFlag(
|
|
(span[0] >> 4) & 0x1)
|
|
fields['message_type'] = MessageType((span[0] >> 5) & 0x7)
|
|
fields['opcode'] = (span[1] >> 0) & 0x3f
|
|
_payload__size = span[3]
|
|
span = span[4:]
|
|
if len(span) < _payload__size:
|
|
raise Exception('Invalid packet size')
|
|
payload = span[:_payload__size]
|
|
fields['payload'] = payload
|
|
if fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0:
|
|
return DeviceResetCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2:
|
|
return GetDeviceInfoCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3:
|
|
return GetCapsInfoCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4:
|
|
return SetConfigCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5:
|
|
return GetConfigCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0:
|
|
return SessionInitCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1:
|
|
return SessionDeinitCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3:
|
|
return SessionSetAppConfigCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4:
|
|
return SessionGetAppConfigCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5:
|
|
return SessionGetCountCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6:
|
|
return SessionGetStateCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL:
|
|
return RangingCommand.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0:
|
|
return PicaInitDeviceCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1:
|
|
return PicaSetDevicePositionCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2:
|
|
return PicaCreateBeaconCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3:
|
|
return PicaSetBeaconPositionCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4:
|
|
return PicaDestroyBeaconCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0:
|
|
return AndroidGetPowerStatsCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1:
|
|
return AndroidSetCountryCodeCmd.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0:
|
|
return DeviceResetRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2:
|
|
return GetDeviceInfoRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3:
|
|
return GetCapsInfoRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4:
|
|
return SetConfigRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5:
|
|
return GetConfigRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0:
|
|
return SessionInitRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1:
|
|
return SessionDeinitRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3:
|
|
return SessionSetAppConfigRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4:
|
|
return SessionGetAppConfigRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5:
|
|
return SessionGetCountRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6:
|
|
return SessionGetStateRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0:
|
|
return RangeStartRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x1:
|
|
return RangeStopRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x3:
|
|
return RangeGetRangingCountRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0:
|
|
return PicaInitDeviceRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1:
|
|
return PicaSetDevicePositionRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2:
|
|
return PicaCreateBeaconRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3:
|
|
return PicaSetBeaconPositionRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4:
|
|
return PicaDestroyBeaconRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0:
|
|
return AndroidGetPowerStatsRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1:
|
|
return AndroidSetCountryCodeRsp.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x1:
|
|
return DeviceStatusNtf.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x7:
|
|
return GenericError.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x2:
|
|
return SessionStatusNtf.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListNtf.parse(fields, payload)
|
|
elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0:
|
|
return RangeDataNtf.parse(fields, payload)
|
|
else:
|
|
return UciPacket(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciCommand(UciPacket):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciCommand', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0:
|
|
return DeviceResetCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2:
|
|
return GetDeviceInfoCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3:
|
|
return GetCapsInfoCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4:
|
|
return SetConfigCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5:
|
|
return GetConfigCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0:
|
|
return SessionInitCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1:
|
|
return SessionDeinitCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3:
|
|
return SessionSetAppConfigCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4:
|
|
return SessionGetAppConfigCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5:
|
|
return SessionGetCountCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6:
|
|
return SessionGetStateCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL:
|
|
return RangingCommand.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0:
|
|
return PicaInitDeviceCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1:
|
|
return PicaSetDevicePositionCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2:
|
|
return PicaCreateBeaconCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3:
|
|
return PicaSetBeaconPositionCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4:
|
|
return PicaDestroyBeaconCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0:
|
|
return AndroidGetPowerStatsCmd.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1:
|
|
return AndroidSetCountryCodeCmd.parse(fields, payload)
|
|
else:
|
|
return UciCommand(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciResponse(UciPacket):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciResponse', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0:
|
|
return DeviceResetRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2:
|
|
return GetDeviceInfoRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3:
|
|
return GetCapsInfoRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4:
|
|
return SetConfigRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5:
|
|
return GetConfigRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0:
|
|
return SessionInitRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1:
|
|
return SessionDeinitRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3:
|
|
return SessionSetAppConfigRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4:
|
|
return SessionGetAppConfigRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5:
|
|
return SessionGetCountRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6:
|
|
return SessionGetStateRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0:
|
|
return RangeStartRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x1:
|
|
return RangeStopRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x3:
|
|
return RangeGetRangingCountRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0:
|
|
return PicaInitDeviceRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1:
|
|
return PicaSetDevicePositionRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2:
|
|
return PicaCreateBeaconRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3:
|
|
return PicaSetBeaconPositionRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4:
|
|
return PicaDestroyBeaconRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0:
|
|
return AndroidGetPowerStatsRsp.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1:
|
|
return AndroidSetCountryCodeRsp.parse(fields, payload)
|
|
else:
|
|
return UciResponse(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciNotification(UciPacket):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciNotification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x1:
|
|
return DeviceStatusNtf.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x7:
|
|
return GenericError.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x2:
|
|
return SessionStatusNtf.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListNtf.parse(fields, payload)
|
|
elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0:
|
|
return RangeDataNtf.parse(fields, payload)
|
|
else:
|
|
return UciNotification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class CoreCommand(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['CoreCommand', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return DeviceResetCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x2:
|
|
return GetDeviceInfoCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return GetCapsInfoCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x4:
|
|
return SetConfigCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x5:
|
|
return GetConfigCmd.parse(fields, payload)
|
|
else:
|
|
return CoreCommand(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class CoreResponse(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['CoreResponse', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return DeviceResetRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x2:
|
|
return GetDeviceInfoRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return GetCapsInfoRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x4:
|
|
return SetConfigRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x5:
|
|
return GetConfigRsp.parse(fields, payload)
|
|
else:
|
|
return CoreResponse(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class CoreNotification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['CoreNotification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x1:
|
|
return DeviceStatusNtf.parse(fields, payload)
|
|
elif fields['opcode'] == 0x7:
|
|
return GenericError.parse(fields, payload)
|
|
else:
|
|
return CoreNotification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionCommand(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionCommand', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return SessionInitCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return SessionDeinitCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return SessionSetAppConfigCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x4:
|
|
return SessionGetAppConfigCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x5:
|
|
return SessionGetCountCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x6:
|
|
return SessionGetStateCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListCmd.parse(fields, payload)
|
|
else:
|
|
return SessionCommand(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionResponse(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionResponse', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return SessionInitRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return SessionDeinitRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return SessionSetAppConfigRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x4:
|
|
return SessionGetAppConfigRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x5:
|
|
return SessionGetCountRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x6:
|
|
return SessionGetStateRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListRsp.parse(fields, payload)
|
|
else:
|
|
return SessionResponse(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionNotification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionNotification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x2:
|
|
return SessionStatusNtf.parse(fields, payload)
|
|
elif fields['opcode'] == 0x7:
|
|
return SessionUpdateControllerMulticastListNtf.parse(fields, payload)
|
|
else:
|
|
return SessionNotification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangingCommand(UciCommand):
|
|
session_id: int
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangingCommand', bytes]:
|
|
if len(span) < 4:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
span = span[4:]
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return RangeStartCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return RangeStopCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return RangeGetRangingCountCmd.parse(fields, payload)
|
|
else:
|
|
return RangingCommand(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangingResponse(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangingResponse', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return RangeStartRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return RangeStopRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return RangeGetRangingCountRsp.parse(fields, payload)
|
|
else:
|
|
return RangingResponse(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangingNotification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangingNotification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return RangeDataNtf.parse(fields, payload)
|
|
else:
|
|
return RangingNotification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaCommand(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaCommand', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return PicaInitDeviceCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return PicaSetDevicePositionCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x2:
|
|
return PicaCreateBeaconCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return PicaSetBeaconPositionCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x4:
|
|
return PicaDestroyBeaconCmd.parse(fields, payload)
|
|
else:
|
|
return PicaCommand(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaResponse(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaResponse', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return PicaInitDeviceRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return PicaSetDevicePositionRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x2:
|
|
return PicaCreateBeaconRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x3:
|
|
return PicaSetBeaconPositionRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x4:
|
|
return PicaDestroyBeaconRsp.parse(fields, payload)
|
|
else:
|
|
return PicaResponse(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidCommand(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidCommand', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return AndroidGetPowerStatsCmd.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return AndroidSetCountryCodeCmd.parse(fields, payload)
|
|
else:
|
|
return AndroidCommand(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidResponse(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidResponse', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['opcode'] == 0x0:
|
|
return AndroidGetPowerStatsRsp.parse(fields, payload)
|
|
elif fields['opcode'] == 0x1:
|
|
return AndroidSetCountryCodeRsp.parse(fields, payload)
|
|
else:
|
|
return AndroidResponse(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidNotification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidNotification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return AndroidNotification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class DeviceResetCmd(CoreCommand):
|
|
reset_config: ResetConfig
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['DeviceResetCmd', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['reset_config'] = ResetConfig(span[0])
|
|
span = span[1:]
|
|
return DeviceResetCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class DeviceResetRsp(CoreResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['DeviceResetRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return DeviceResetRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class DeviceStatusNtf(CoreNotification):
|
|
device_state: DeviceState
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['DeviceStatusNtf', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['device_state'] = DeviceState(span[0])
|
|
span = span[1:]
|
|
return DeviceStatusNtf(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GetDeviceInfoCmd(CoreCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoCmd', bytes]:
|
|
return GetDeviceInfoCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GetDeviceInfoRsp(CoreResponse):
|
|
status: StatusCode
|
|
uci_version: int
|
|
mac_version: int
|
|
phy_version: int
|
|
uci_test_version: int
|
|
vendor_spec_info: bytes
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoRsp', bytes]:
|
|
if len(span) < 10:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
value_ = int.from_bytes(span[1:3], byteorder='little')
|
|
fields['uci_version'] = value_
|
|
value_ = int.from_bytes(span[3:5], byteorder='little')
|
|
fields['mac_version'] = value_
|
|
value_ = int.from_bytes(span[5:7], byteorder='little')
|
|
fields['phy_version'] = value_
|
|
value_ = int.from_bytes(span[7:9], byteorder='little')
|
|
fields['uci_test_version'] = value_
|
|
vendor_spec_info_count = span[9]
|
|
span = span[10:]
|
|
if len(span) < vendor_spec_info_count:
|
|
raise Exception('Invalid packet size')
|
|
vendor_spec_info = []
|
|
for n in range(vendor_spec_info_count):
|
|
vendor_spec_info.append(int.from_bytes(
|
|
span[n:n + 1], byteorder='little'))
|
|
fields['vendor_spec_info'] = vendor_spec_info
|
|
span = span[vendor_spec_info_count:]
|
|
return GetDeviceInfoRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GetCapsInfoCmd(CoreCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoCmd', bytes]:
|
|
return GetCapsInfoCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class CapTlv(Packet):
|
|
t: CapTlvType
|
|
v: bytes
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['CapTlv', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['t'] = CapTlvType(span[0])
|
|
v_count = span[1]
|
|
span = span[2:]
|
|
if len(span) < v_count:
|
|
raise Exception('Invalid packet size')
|
|
v = []
|
|
for n in range(v_count):
|
|
v.append(int.from_bytes(span[n:n + 1], byteorder='little'))
|
|
fields['v'] = v
|
|
span = span[v_count:]
|
|
return CapTlv(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GetCapsInfoRsp(CoreResponse):
|
|
status: StatusCode
|
|
tlvs: List[CapTlv]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
tlvs_count = span[1]
|
|
span = span[2:]
|
|
tlvs = []
|
|
for n in range(tlvs_count):
|
|
element, span = CapTlv.parse(span)
|
|
tlvs.append(element)
|
|
fields['tlvs'] = tlvs
|
|
return GetCapsInfoRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class DeviceParameter(Packet):
|
|
id: int
|
|
value: bytes
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['DeviceParameter', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['id'] = span[0]
|
|
value_count = span[1]
|
|
span = span[2:]
|
|
if len(span) < value_count:
|
|
raise Exception('Invalid packet size')
|
|
value = []
|
|
for n in range(value_count):
|
|
value.append(int.from_bytes(span[n:n + 1], byteorder='little'))
|
|
fields['value'] = value
|
|
span = span[value_count:]
|
|
return DeviceParameter(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SetConfigCmd(CoreCommand):
|
|
parameters: List[DeviceParameter]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SetConfigCmd', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
parameters_count = span[0]
|
|
span = span[1:]
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
element, span = DeviceParameter.parse(span)
|
|
parameters.append(element)
|
|
fields['parameters'] = parameters
|
|
return SetConfigCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class DeviceConfigStatus(Packet):
|
|
parameter_id: int
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['DeviceConfigStatus', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['parameter_id'] = span[0]
|
|
fields['status'] = StatusCode(span[1])
|
|
span = span[2:]
|
|
return DeviceConfigStatus(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SetConfigRsp(CoreResponse):
|
|
status: StatusCode
|
|
parameters: List[DeviceConfigStatus]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SetConfigRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
parameters_count = span[1]
|
|
span = span[2:]
|
|
if len(span) < parameters_count * 2:
|
|
raise Exception('Invalid packet size')
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
parameters.append(DeviceConfigStatus.parse_all(
|
|
span[n * 2:(n + 1) * 2]))
|
|
fields['parameters'] = parameters
|
|
span = span[parameters_count * 2:]
|
|
return SetConfigRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GetConfigCmd(CoreCommand):
|
|
parameter_ids: bytes
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GetConfigCmd', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
parameter_ids_count = span[0]
|
|
span = span[1:]
|
|
if len(span) < parameter_ids_count:
|
|
raise Exception('Invalid packet size')
|
|
parameter_ids = []
|
|
for n in range(parameter_ids_count):
|
|
parameter_ids.append(int.from_bytes(
|
|
span[n:n + 1], byteorder='little'))
|
|
fields['parameter_ids'] = parameter_ids
|
|
span = span[parameter_ids_count:]
|
|
return GetConfigCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GetConfigRsp(CoreResponse):
|
|
status: StatusCode
|
|
parameters: List[DeviceParameter]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GetConfigRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
parameters_count = span[1]
|
|
span = span[2:]
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
element, span = DeviceParameter.parse(span)
|
|
parameters.append(element)
|
|
fields['parameters'] = parameters
|
|
return GetConfigRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GenericError(CoreNotification):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['GenericError', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return GenericError(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionInitCmd(SessionCommand):
|
|
session_id: int
|
|
session_type: SessionType
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionInitCmd', bytes]:
|
|
if len(span) < 5:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
fields['session_type'] = SessionType(span[4])
|
|
span = span[5:]
|
|
return SessionInitCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionInitRsp(SessionResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return SessionInitRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionDeinitCmd(SessionCommand):
|
|
session_id: int
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitCmd', bytes]:
|
|
if len(span) < 4:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
span = span[4:]
|
|
return SessionDeinitCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionDeinitRsp(SessionResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return SessionDeinitRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionStatusNtf(SessionNotification):
|
|
session_id: int
|
|
session_state: SessionState
|
|
reason_code: ReasonCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionStatusNtf', bytes]:
|
|
if len(span) < 6:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
fields['session_state'] = SessionState(span[4])
|
|
fields['reason_code'] = ReasonCode(span[5])
|
|
span = span[6:]
|
|
return SessionStatusNtf(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AppConfigParameter(Packet):
|
|
id: int
|
|
value: bytes
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['AppConfigParameter', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['id'] = span[0]
|
|
value_count = span[1]
|
|
span = span[2:]
|
|
if len(span) < value_count:
|
|
raise Exception('Invalid packet size')
|
|
value = []
|
|
for n in range(value_count):
|
|
value.append(int.from_bytes(span[n:n + 1], byteorder='little'))
|
|
fields['value'] = value
|
|
span = span[value_count:]
|
|
return AppConfigParameter(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionSetAppConfigCmd(SessionCommand):
|
|
session_id: int
|
|
parameters: List[AppConfigParameter]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigCmd', bytes]:
|
|
if len(span) < 5:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
parameters_count = span[4]
|
|
span = span[5:]
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
element, span = AppConfigParameter.parse(span)
|
|
parameters.append(element)
|
|
fields['parameters'] = parameters
|
|
return SessionSetAppConfigCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AppConfigStatus(Packet):
|
|
config_id: int
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['AppConfigStatus', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['config_id'] = span[0]
|
|
fields['status'] = StatusCode(span[1])
|
|
span = span[2:]
|
|
return AppConfigStatus(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionSetAppConfigRsp(SessionResponse):
|
|
status: StatusCode
|
|
parameters: List[AppConfigStatus]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
parameters_count = span[1]
|
|
span = span[2:]
|
|
if len(span) < parameters_count * 2:
|
|
raise Exception('Invalid packet size')
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
parameters.append(AppConfigStatus.parse_all(
|
|
span[n * 2:(n + 1) * 2]))
|
|
fields['parameters'] = parameters
|
|
span = span[parameters_count * 2:]
|
|
return SessionSetAppConfigRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionGetAppConfigCmd(SessionCommand):
|
|
session_id: int
|
|
parameters: bytes
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigCmd', bytes]:
|
|
if len(span) < 5:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
parameters_count = span[4]
|
|
span = span[5:]
|
|
if len(span) < parameters_count:
|
|
raise Exception('Invalid packet size')
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
parameters.append(int.from_bytes(
|
|
span[n:n + 1], byteorder='little'))
|
|
fields['parameters'] = parameters
|
|
span = span[parameters_count:]
|
|
return SessionGetAppConfigCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionGetAppConfigRsp(SessionResponse):
|
|
status: StatusCode
|
|
parameters: List[AppConfigParameter]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
parameters_count = span[1]
|
|
span = span[2:]
|
|
parameters = []
|
|
for n in range(parameters_count):
|
|
element, span = AppConfigParameter.parse(span)
|
|
parameters.append(element)
|
|
fields['parameters'] = parameters
|
|
return SessionGetAppConfigRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionGetCountCmd(SessionCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountCmd', bytes]:
|
|
return SessionGetCountCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionGetCountRsp(SessionResponse):
|
|
status: StatusCode
|
|
session_count: int
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
fields['session_count'] = span[1]
|
|
span = span[2:]
|
|
return SessionGetCountRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionGetStateCmd(SessionCommand):
|
|
session_id: int
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateCmd', bytes]:
|
|
if len(span) < 4:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
span = span[4:]
|
|
return SessionGetStateCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionGetStateRsp(SessionResponse):
|
|
status: StatusCode
|
|
session_state: SessionState
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateRsp', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
fields['session_state'] = SessionState(span[1])
|
|
span = span[2:]
|
|
return SessionGetStateRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class Controlee(Packet):
|
|
short_address: int
|
|
subsession_id: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['Controlee', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 6:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:2], byteorder='little')
|
|
fields['short_address'] = value_
|
|
value_ = int.from_bytes(span[2:6], byteorder='little')
|
|
fields['subsession_id'] = value_
|
|
span = span[6:]
|
|
return Controlee(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionUpdateControllerMulticastListCmd(SessionCommand):
|
|
session_id: int
|
|
action: int
|
|
controlees: List[Controlee]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd', bytes]:
|
|
if len(span) < 6:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
fields['action'] = span[4]
|
|
controlees_count = span[5]
|
|
span = span[6:]
|
|
if len(span) < controlees_count * 6:
|
|
raise Exception('Invalid packet size')
|
|
controlees = []
|
|
for n in range(controlees_count):
|
|
controlees.append(Controlee.parse_all(span[n * 6:(n + 1) * 6]))
|
|
fields['controlees'] = controlees
|
|
span = span[controlees_count * 6:]
|
|
return SessionUpdateControllerMulticastListCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionUpdateControllerMulticastListRsp(SessionResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return SessionUpdateControllerMulticastListRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ControleeStatus(Packet):
|
|
mac_address: int
|
|
subsession_id: int
|
|
status: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['ControleeStatus', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 7:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:2], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
value_ = int.from_bytes(span[2:6], byteorder='little')
|
|
fields['subsession_id'] = value_
|
|
fields['status'] = span[6]
|
|
span = span[7:]
|
|
return ControleeStatus(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SessionUpdateControllerMulticastListNtf(SessionNotification):
|
|
session_id: int
|
|
remaining_multicast_list_size: int
|
|
controlee_status: List[ControleeStatus]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListNtf', bytes]:
|
|
if len(span) < 6:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['session_id'] = value_
|
|
fields['remaining_multicast_list_size'] = span[4]
|
|
controlee_status_count = span[5]
|
|
span = span[6:]
|
|
if len(span) < controlee_status_count * 7:
|
|
raise Exception('Invalid packet size')
|
|
controlee_status = []
|
|
for n in range(controlee_status_count):
|
|
controlee_status.append(
|
|
ControleeStatus.parse_all(span[n * 7:(n + 1) * 7]))
|
|
fields['controlee_status'] = controlee_status
|
|
span = span[controlee_status_count * 7:]
|
|
return SessionUpdateControllerMulticastListNtf(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangeStartCmd(RangingCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeStartCmd', bytes]:
|
|
return RangeStartCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangeStartRsp(RangingResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeStartRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return RangeStartRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ShortAddressTwoWayRangingMeasurement(Packet):
|
|
mac_address: int
|
|
status: StatusCode
|
|
nlos: int
|
|
distance: int
|
|
aoa_azimuth: int
|
|
aoa_azimuth_fom: int
|
|
aoa_elevation: int
|
|
aoa_elevation_fom: int
|
|
aoa_destination_azimuth: int
|
|
aoa_destination_azimuth_fom: int
|
|
aoa_destination_elevation: int
|
|
aoa_destination_elevation_fom: int
|
|
slot_index: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['ShortAddressTwoWayRangingMeasurement', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 31:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:2], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
fields['status'] = StatusCode(span[2])
|
|
fields['nlos'] = span[3]
|
|
value_ = int.from_bytes(span[4:6], byteorder='little')
|
|
fields['distance'] = value_
|
|
value_ = int.from_bytes(span[6:8], byteorder='little')
|
|
fields['aoa_azimuth'] = value_
|
|
fields['aoa_azimuth_fom'] = span[8]
|
|
value_ = int.from_bytes(span[9:11], byteorder='little')
|
|
fields['aoa_elevation'] = value_
|
|
fields['aoa_elevation_fom'] = span[11]
|
|
value_ = int.from_bytes(span[12:14], byteorder='little')
|
|
fields['aoa_destination_azimuth'] = value_
|
|
fields['aoa_destination_azimuth_fom'] = span[14]
|
|
value_ = int.from_bytes(span[15:17], byteorder='little')
|
|
fields['aoa_destination_elevation'] = value_
|
|
fields['aoa_destination_elevation_fom'] = span[17]
|
|
fields['slot_index'] = span[18]
|
|
value_ = int.from_bytes(span[19:31], byteorder='little')
|
|
span = span[31:]
|
|
return ShortAddressTwoWayRangingMeasurement(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ExtendedAddressTwoWayRangingMeasurement(Packet):
|
|
mac_address: int
|
|
status: StatusCode
|
|
nlos: int
|
|
distance: int
|
|
aoa_azimuth: int
|
|
aoa_azimuth_fom: int
|
|
aoa_elevation: int
|
|
aoa_elevation_fom: int
|
|
aoa_destination_azimuth: int
|
|
aoa_destination_azimuth_fom: int
|
|
aoa_destination_elevation: int
|
|
aoa_destination_elevation_fom: int
|
|
slot_index: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['ExtendedAddressTwoWayRangingMeasurement', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 31:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:8], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
fields['status'] = StatusCode(span[8])
|
|
fields['nlos'] = span[9]
|
|
value_ = int.from_bytes(span[10:12], byteorder='little')
|
|
fields['distance'] = value_
|
|
value_ = int.from_bytes(span[12:14], byteorder='little')
|
|
fields['aoa_azimuth'] = value_
|
|
fields['aoa_azimuth_fom'] = span[14]
|
|
value_ = int.from_bytes(span[15:17], byteorder='little')
|
|
fields['aoa_elevation'] = value_
|
|
fields['aoa_elevation_fom'] = span[17]
|
|
value_ = int.from_bytes(span[18:20], byteorder='little')
|
|
fields['aoa_destination_azimuth'] = value_
|
|
fields['aoa_destination_azimuth_fom'] = span[20]
|
|
value_ = int.from_bytes(span[21:23], byteorder='little')
|
|
fields['aoa_destination_elevation'] = value_
|
|
fields['aoa_destination_elevation_fom'] = span[23]
|
|
fields['slot_index'] = span[24]
|
|
value_ = int.from_bytes(span[25:31], byteorder='little')
|
|
span = span[31:]
|
|
return ExtendedAddressTwoWayRangingMeasurement(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
class RangingMeasurementType(enum.IntEnum):
|
|
ONE_WAY = 0x0
|
|
TWO_WAY = 0x1
|
|
|
|
|
|
@dataclass
|
|
class RangeDataNtf(RangingNotification):
|
|
sequence_number: int
|
|
session_id: int
|
|
rcr_indicator: int
|
|
current_ranging_interval: int
|
|
ranging_measurement_type: RangingMeasurementType
|
|
mac_address_indicator: MacAddressIndicator
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeDataNtf', bytes]:
|
|
if len(span) < 24:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:4], byteorder='little')
|
|
fields['sequence_number'] = value_
|
|
value_ = int.from_bytes(span[4:8], byteorder='little')
|
|
fields['session_id'] = value_
|
|
fields['rcr_indicator'] = span[8]
|
|
value_ = int.from_bytes(span[9:13], byteorder='little')
|
|
fields['current_ranging_interval'] = value_
|
|
fields['ranging_measurement_type'] = RangingMeasurementType(span[13])
|
|
fields['mac_address_indicator'] = MacAddressIndicator(span[15])
|
|
value_ = int.from_bytes(span[16:24], byteorder='little')
|
|
span = span[24:]
|
|
payload = span
|
|
fields['payload'] = payload
|
|
if fields['ranging_measurement_type'] == RangingMeasurementType.TWO_WAY and fields['mac_address_indicator'] == MacAddressIndicator.SHORT_ADDRESS:
|
|
return ShortMacTwoWayRangeDataNtf.parse(fields, payload)
|
|
elif fields['ranging_measurement_type'] == RangingMeasurementType.TWO_WAY and fields['mac_address_indicator'] == MacAddressIndicator.EXTENDED_ADDRESS:
|
|
return ExtendedMacTwoWayRangeDataNtf.parse(fields, payload)
|
|
else:
|
|
return RangeDataNtf(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ShortMacTwoWayRangeDataNtf(RangeDataNtf):
|
|
two_way_ranging_measurements: List[ShortAddressTwoWayRangingMeasurement]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['ShortMacTwoWayRangeDataNtf', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
two_way_ranging_measurements_count = span[0]
|
|
span = span[1:]
|
|
if len(span) < two_way_ranging_measurements_count * 31:
|
|
raise Exception('Invalid packet size')
|
|
two_way_ranging_measurements = []
|
|
for n in range(two_way_ranging_measurements_count):
|
|
two_way_ranging_measurements.append(
|
|
ShortAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31]))
|
|
fields['two_way_ranging_measurements'] = two_way_ranging_measurements
|
|
span = span[two_way_ranging_measurements_count * 31:]
|
|
return ShortMacTwoWayRangeDataNtf(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ExtendedMacTwoWayRangeDataNtf(RangeDataNtf):
|
|
two_way_ranging_measurements: List[ExtendedAddressTwoWayRangingMeasurement]
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacTwoWayRangeDataNtf', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
two_way_ranging_measurements_count = span[0]
|
|
span = span[1:]
|
|
if len(span) < two_way_ranging_measurements_count * 31:
|
|
raise Exception('Invalid packet size')
|
|
two_way_ranging_measurements = []
|
|
for n in range(two_way_ranging_measurements_count):
|
|
two_way_ranging_measurements.append(
|
|
ExtendedAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31]))
|
|
fields['two_way_ranging_measurements'] = two_way_ranging_measurements
|
|
span = span[two_way_ranging_measurements_count * 31:]
|
|
return ExtendedMacTwoWayRangeDataNtf(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangeStopCmd(RangingCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeStopCmd', bytes]:
|
|
return RangeStopCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangeStopRsp(RangingResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeStopRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return RangeStopRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangeGetRangingCountCmd(RangingCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeGetRangingCountCmd', bytes]:
|
|
return RangeGetRangingCountCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class RangeGetRangingCountRsp(RangingResponse):
|
|
status: StatusCode
|
|
count: int
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['RangeGetRangingCountRsp', bytes]:
|
|
if len(span) < 5:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
value_ = int.from_bytes(span[1:5], byteorder='little')
|
|
fields['count'] = value_
|
|
span = span[5:]
|
|
return RangeGetRangingCountRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaPosition(Packet):
|
|
x: int
|
|
y: int
|
|
z: int
|
|
yaw: int
|
|
pitch: int
|
|
roll: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['PicaPosition', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 11:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:2], byteorder='little')
|
|
fields['x'] = value_
|
|
value_ = int.from_bytes(span[2:4], byteorder='little')
|
|
fields['y'] = value_
|
|
value_ = int.from_bytes(span[4:6], byteorder='little')
|
|
fields['z'] = value_
|
|
value_ = int.from_bytes(span[6:8], byteorder='little')
|
|
fields['yaw'] = value_
|
|
fields['pitch'] = span[8]
|
|
value_ = int.from_bytes(span[9:11], byteorder='little')
|
|
fields['roll'] = value_
|
|
span = span[11:]
|
|
return PicaPosition(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaInitDeviceCmd(PicaCommand):
|
|
mac_address: int
|
|
position: PicaPosition
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaInitDeviceCmd', bytes]:
|
|
if len(span) < 19:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:8], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
fields['position'] = PicaPosition.parse_all(span[8:19])
|
|
span = span[19:]
|
|
return PicaInitDeviceCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaInitDeviceRsp(PicaResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaInitDeviceRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return PicaInitDeviceRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaSetDevicePositionCmd(PicaCommand):
|
|
position: PicaPosition
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaSetDevicePositionCmd', bytes]:
|
|
if len(span) < 11:
|
|
raise Exception('Invalid packet size')
|
|
fields['position'] = PicaPosition.parse_all(span[0:11])
|
|
span = span[11:]
|
|
return PicaSetDevicePositionCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaSetDevicePositionRsp(PicaResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaSetDevicePositionRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return PicaSetDevicePositionRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaCreateBeaconCmd(PicaCommand):
|
|
mac_address: int
|
|
position: PicaPosition
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaCreateBeaconCmd', bytes]:
|
|
if len(span) < 19:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:8], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
fields['position'] = PicaPosition.parse_all(span[8:19])
|
|
span = span[19:]
|
|
return PicaCreateBeaconCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaCreateBeaconRsp(PicaResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaCreateBeaconRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return PicaCreateBeaconRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaSetBeaconPositionCmd(PicaCommand):
|
|
mac_address: int
|
|
position: PicaPosition
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaSetBeaconPositionCmd', bytes]:
|
|
if len(span) < 19:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:8], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
fields['position'] = PicaPosition.parse_all(span[8:19])
|
|
span = span[19:]
|
|
return PicaSetBeaconPositionCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaSetBeaconPositionRsp(PicaResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaSetBeaconPositionRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return PicaSetBeaconPositionRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaDestroyBeaconCmd(PicaCommand):
|
|
mac_address: int
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaDestroyBeaconCmd', bytes]:
|
|
if len(span) < 8:
|
|
raise Exception('Invalid packet size')
|
|
value_ = int.from_bytes(span[0:8], byteorder='little')
|
|
fields['mac_address'] = value_
|
|
span = span[8:]
|
|
return PicaDestroyBeaconCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PicaDestroyBeaconRsp(PicaResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['PicaDestroyBeaconRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return PicaDestroyBeaconRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidGetPowerStatsCmd(AndroidCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsCmd', bytes]:
|
|
return AndroidGetPowerStatsCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class PowerStats(Packet):
|
|
status: StatusCode
|
|
idle_time_ms: int
|
|
tx_time_ms: int
|
|
rx_time_ms: int
|
|
total_wake_count: int
|
|
|
|
@staticmethod
|
|
def parse(span: bytes) -> Tuple['PowerStats', bytes]:
|
|
fields = {'payload': None}
|
|
if len(span) < 17:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
value_ = int.from_bytes(span[1:5], byteorder='little')
|
|
fields['idle_time_ms'] = value_
|
|
value_ = int.from_bytes(span[5:9], byteorder='little')
|
|
fields['tx_time_ms'] = value_
|
|
value_ = int.from_bytes(span[9:13], byteorder='little')
|
|
fields['rx_time_ms'] = value_
|
|
value_ = int.from_bytes(span[13:17], byteorder='little')
|
|
fields['total_wake_count'] = value_
|
|
span = span[17:]
|
|
return PowerStats(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidGetPowerStatsRsp(AndroidResponse):
|
|
stats: PowerStats
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsRsp', bytes]:
|
|
if len(span) < 17:
|
|
raise Exception('Invalid packet size')
|
|
fields['stats'] = PowerStats.parse_all(span[0:17])
|
|
span = span[17:]
|
|
return AndroidGetPowerStatsRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidSetCountryCodeCmd(AndroidCommand):
|
|
country_code: bytes
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeCmd', bytes]:
|
|
if len(span) < 2:
|
|
raise Exception('Invalid packet size')
|
|
country_code = []
|
|
for n in range(2):
|
|
country_code.append(int.from_bytes(
|
|
span[n:n + 1], byteorder='little'))
|
|
fields['country_code'] = country_code
|
|
span = span[2:]
|
|
return AndroidSetCountryCodeCmd(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AndroidSetCountryCodeRsp(AndroidResponse):
|
|
status: StatusCode
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeRsp', bytes]:
|
|
if len(span) < 1:
|
|
raise Exception('Invalid packet size')
|
|
fields['status'] = StatusCode(span[0])
|
|
span = span[1:]
|
|
return AndroidSetCountryCodeRsp(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_A_Command(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Command', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_A_Command(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_B_Command(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Command', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_B_Command(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_E_Command(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Command', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_E_Command(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_F_Command(UciCommand):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Command', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_F_Command(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_A_Response(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Response', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_A_Response(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_B_Response(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Response', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_B_Response(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_E_Response(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Response', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_E_Response(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_F_Response(UciResponse):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Response', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_F_Response(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_A_Notification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Notification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_A_Notification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_B_Notification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Notification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_B_Notification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_E_Notification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Notification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_E_Notification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UciVendor_F_Notification(UciNotification):
|
|
|
|
@staticmethod
|
|
def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Notification', bytes]:
|
|
payload = span
|
|
fields['payload'] = payload
|
|
return UciVendor_F_Notification(**fields), span
|
|
|
|
def serialize(self) -> bytes:
|
|
pass
|