""" maxminddb.decoder ~~~~~~~~~~~~~~~~~ This package contains code for decoding the MaxMind DB data section. """ import struct from typing import cast, Dict, List, Tuple, Union try: # pylint: disable=unused-import import mmap except ImportError: # pylint: disable=invalid-name mmap = None # type: ignore from maxminddb.errors import InvalidDatabaseError from maxminddb.file import FileBuffer from maxminddb.types import Record class Decoder: # pylint: disable=too-few-public-methods """Decoder for the data section of the MaxMind DB""" def __init__( self, database_buffer: Union[FileBuffer, "mmap.mmap", bytes], pointer_base: int = 0, pointer_test: bool = False, ) -> None: """Created a Decoder for a MaxMind DB Arguments: database_buffer -- an mmap'd MaxMind DB file. pointer_base -- the base number to use when decoding a pointer pointer_test -- used for internal unit testing of pointer code """ self._pointer_test = pointer_test self._buffer = database_buffer self._pointer_base = pointer_base def _decode_array(self, size: int, offset: int) -> Tuple[List[Record], int]: array = [] for _ in range(size): (value, offset) = self.decode(offset) array.append(value) return array, offset def _decode_boolean(self, size: int, offset: int) -> Tuple[bool, int]: return size != 0, offset def _decode_bytes(self, size: int, offset: int) -> Tuple[bytes, int]: new_offset = offset + size return self._buffer[offset:new_offset], new_offset def _decode_double(self, size: int, offset: int) -> Tuple[float, int]: self._verify_size(size, 8) new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] (value,) = struct.unpack(b"!d", packed_bytes) return value, new_offset def _decode_float(self, size: int, offset: int) -> Tuple[float, int]: self._verify_size(size, 4) new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] (value,) = struct.unpack(b"!f", packed_bytes) return value, new_offset def _decode_int32(self, size: int, offset: int) -> Tuple[int, int]: if size == 0: return 0, offset new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] if size != 4: packed_bytes = packed_bytes.rjust(4, b"\x00") (value,) = struct.unpack(b"!i", packed_bytes) return value, new_offset def _decode_map(self, size: int, offset: int) -> Tuple[Dict[str, Record], int]: container: Dict[str, Record] = {} for _ in range(size): (key, offset) = self.decode(offset) (value, offset) = self.decode(offset) container[cast(str, key)] = value return container, offset def _decode_pointer(self, size: int, offset: int) -> Tuple[Record, int]: pointer_size = (size >> 3) + 1 buf = self._buffer[offset : offset + pointer_size] new_offset = offset + pointer_size if pointer_size == 1: buf = bytes([size & 0x7]) + buf pointer = struct.unpack(b"!H", buf)[0] + self._pointer_base elif pointer_size == 2: buf = b"\x00" + bytes([size & 0x7]) + buf pointer = struct.unpack(b"!I", buf)[0] + 2048 + self._pointer_base elif pointer_size == 3: buf = bytes([size & 0x7]) + buf pointer = struct.unpack(b"!I", buf)[0] + 526336 + self._pointer_base else: pointer = struct.unpack(b"!I", buf)[0] + self._pointer_base if self._pointer_test: return pointer, new_offset (value, _) = self.decode(pointer) return value, new_offset def _decode_uint(self, size: int, offset: int) -> Tuple[int, int]: new_offset = offset + size uint_bytes = self._buffer[offset:new_offset] return int.from_bytes(uint_bytes, "big"), new_offset def _decode_utf8_string(self, size: int, offset: int) -> Tuple[str, int]: new_offset = offset + size return self._buffer[offset:new_offset].decode("utf-8"), new_offset _type_decoder = { 1: _decode_pointer, 2: _decode_utf8_string, 3: _decode_double, 4: _decode_bytes, 5: _decode_uint, # uint16 6: _decode_uint, # uint32 7: _decode_map, 8: _decode_int32, 9: _decode_uint, # uint64 10: _decode_uint, # uint128 11: _decode_array, 14: _decode_boolean, 15: _decode_float, } def decode(self, offset: int) -> Tuple[Record, int]: """Decode a section of the data section starting at offset Arguments: offset -- the location of the data structure to decode """ new_offset = offset + 1 ctrl_byte = self._buffer[offset] type_num = ctrl_byte >> 5 # Extended type if not type_num: (type_num, new_offset) = self._read_extended(new_offset) try: decoder = self._type_decoder[type_num] except KeyError as ex: raise InvalidDatabaseError( f"Unexpected type number ({type_num}) encountered" ) from ex (size, new_offset) = self._size_from_ctrl_byte(ctrl_byte, new_offset, type_num) return decoder(self, size, new_offset) def _read_extended(self, offset: int) -> Tuple[int, int]: next_byte = self._buffer[offset] type_num = next_byte + 7 if type_num < 7: raise InvalidDatabaseError( "Something went horribly wrong in the decoder. An " f"extended type resolved to a type number < 8 ({type_num})" ) return type_num, offset + 1 @staticmethod def _verify_size(expected: int, actual: int) -> None: if expected != actual: raise InvalidDatabaseError( "The MaxMind DB file's data section contains bad data " "(unknown data type or corrupt data)" ) def _size_from_ctrl_byte( self, ctrl_byte: int, offset: int, type_num: int ) -> Tuple[int, int]: size = ctrl_byte & 0x1F if type_num == 1 or size < 29: return size, offset if size == 29: size = 29 + self._buffer[offset] return size, offset + 1 # Using unpack rather than int_from_bytes as it is faster # here and below. if size == 30: new_offset = offset + 2 size_bytes = self._buffer[offset:new_offset] size = 285 + struct.unpack(b"!H", size_bytes)[0] return size, new_offset new_offset = offset + 3 size_bytes = self._buffer[offset:new_offset] size = struct.unpack(b"!I", b"\x00" + size_bytes)[0] + 65821 return size, new_offset