#!/usr/bin/env python3 # # Copyright (c) 2016, The OpenThread Authors. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of the copyright holder nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # import io import math import struct from binascii import hexlify from enum import IntEnum from tlvs_parsing import SubTlvsFactory import common class TlvType(IntEnum): HAS_ROUTE = 0 PREFIX = 1 BORDER_ROUTER = 2 LOWPAN_ID = 3 COMMISSIONING = 4 SERVICE = 5 SERVER = 6 class NetworkData(object): def __init__(self, stable): self._stable = stable @property def stable(self): return self._stable class NetworkDataSubTlvsFactory(SubTlvsFactory): def parse(self, data, message_info): sub_tlvs = [] while data.tell() < len(data.getvalue()): data_byte = ord(data.read(1)) stable = data_byte & 0x01 _type = (data_byte >> 1) & 0x7F length = ord(data.read(1)) value = data.read(length) factory = self._get_factory(_type) message_info.stable = stable tlv = factory.parse(io.BytesIO(value), message_info) sub_tlvs.append(tlv) return sub_tlvs class Route(object): def __init__(self, border_router_16, prf): self._border_router_16 = border_router_16 self._prf = prf @property def border_router_16(self): return self._border_router_16 @property def prf(self): return self._prf def __eq__(self, other): common.expect_the_same_class(self, other) return (self.border_router_16 == other.border_router_16 and self.prf == other.prf) def __repr__(self): return "Route(border_router_16={}, prf={})".format(self.border_router_16, self.prf) class RouteFactory(object): def parse(self, data, message_info): border_router_16 = struct.unpack(">H", data.read(2))[0] data_byte = ord(data.read(1)) prf = (data_byte >> 6) & 0x03 return Route(border_router_16, prf) class RoutesFactory(object): def __init__(self, route_factory): self._route_factory = route_factory def parse(self, data, message_info): routes = [] while data.tell() < len(data.getvalue()): route = self._route_factory.parse(data, message_info) routes.append(route) return routes class HasRoute(NetworkData): def __init__(self, routes, stable): super(HasRoute, self).__init__(stable) self._routes = routes @property def routes(self): return self._routes def __eq__(self, other): common.expect_the_same_class(self, other) return self.routes == other.routes def __repr__(self): routes_str = ", ".join(["{}".format(route) for route in self.routes]) return "HasRoute(stable={}, routes=[{}])".format(self.stable, routes_str) class HasRouteFactory(object): def __init__(self, routes_factory): self._routes_factory = routes_factory def parse(self, data, message_info): routes = self._routes_factory.parse(data, message_info) return HasRoute(routes, message_info.stable) class Prefix(NetworkData): def __init__(self, domain_id, prefix_length, prefix, sub_tlvs, stable): super(Prefix, self).__init__(stable) self._domain_id = domain_id self._prefix_length = prefix_length self._prefix = prefix self._sub_tlvs = sub_tlvs @property def domain_id(self): return self._domain_id @property def prefix_length(self): return self._prefix_length @property def prefix(self): return self._prefix @property def sub_tlvs(self): return self._sub_tlvs def __eq__(self, other): common.expect_the_same_class(self, other) return (self.domain_id == other.domain_id and self.prefix_length == other.prefix_length and self.prefix == other.prefix and self.sub_tlvs == other.sub_tlvs) def __repr__(self): sub_tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.sub_tlvs]) return "Prefix(stable={}, domain_id={}, prefix_length={}, prefix={}, sub_tlvs=[{}])".format( self.stable, self.domain_id, self.prefix_length, hexlify(self.prefix), sub_tlvs_str, ) class PrefixSubTlvsFactory(NetworkDataSubTlvsFactory): def __init__(self, sub_tlvs_factories): super(PrefixSubTlvsFactory, self).__init__(sub_tlvs_factories) class PrefixFactory(object): def __init__(self, sub_tlvs_factory): self._sub_tlvs_factory = sub_tlvs_factory def _bits_to_bytes(self, bits): return int(math.ceil(bits / 8)) def parse(self, data, message_info): domain_id = ord(data.read(1)) prefix_length = ord(data.read(1)) prefix = bytearray(data.read(self._bits_to_bytes(prefix_length))) sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info) return Prefix(domain_id, prefix_length, prefix, sub_tlvs, message_info.stable) class BorderRouter(NetworkData): def __init__(self, border_router_16, prf, p, s, d, c, r, o, n, stable): super(BorderRouter, self).__init__(stable) self._border_router_16 = border_router_16 self._prf = prf self._p = p self._s = s self._d = d self._c = c self._r = r self._o = o self._n = n @property def border_router_16(self): return self._border_router_16 @property def prf(self): return self._prf @property def p(self): return self._p @property def s(self): return self._s @property def d(self): return self._d @property def c(self): return self._c @property def r(self): return self._r @property def o(self): return self._o @property def n(self): return self._n def __eq__(self, other): common.expect_the_same_class(self, other) return (self.border_router_16 == other.border_router_16 and self.prf == other.prf and self.p == other.p and self.s == other.s and self.d == other.d and self.c == other.c and self.r == other.r and self.o == other.o and self.n == other.n) def __repr__(self): return "BorderRouter(stable={}, border_router_16={}, prf={}, p={}, s={}, d={}, c={}, r={}, o={}, n={})".format( self.stable, self.border_router_16, self.prf, self.p, self.s, self.d, self.c, self.r, self.o, self.n, ) class BorderRouterFactory(object): def parse(self, data, message_info): border_router_16 = struct.unpack(">H", data.read(2))[0] data_byte = ord(data.read(1)) o = data_byte & 0x01 r = (data_byte >> 1) & 0x01 c = (data_byte >> 2) & 0x01 d = (data_byte >> 3) & 0x01 s = (data_byte >> 4) & 0x01 p = (data_byte >> 5) & 0x01 prf = (data_byte >> 6) & 0x03 data_byte = ord(data.read(1)) n = (data_byte >> 7) & 0x01 return BorderRouter(border_router_16, prf, p, s, d, c, r, o, n, message_info.stable) class LowpanId(NetworkData): def __init__(self, c, cid, context_length, stable): super(LowpanId, self).__init__(stable) self._c = c self._cid = cid self._context_length = context_length @property def c(self): return self._c @property def cid(self): return self._cid @property def context_length(self): return self._context_length def __eq__(self, other): common.expect_the_same_class(self, other) return (self.c == other.c and self.cid == other.cid and self.context_length == other.context_length) def __repr__(self): return "LowpanId(stable={}, c={}, cid={}, context_length={})".format(self.stable, self.c, self.cid, self.context_length) class LowpanIdFactory(object): def parse(self, data, message_info): data_byte = ord(data.read(1)) cid = data_byte & 0x0F c = (data_byte >> 4) & 0x01 context_length = ord(data.read(1)) return LowpanId(c, cid, context_length, message_info.stable) class CommissioningData(NetworkData): def __init__(self, sub_tlvs, stable): super(CommissioningData, self).__init__(stable) self._sub_tlvs = sub_tlvs @property def sub_tlvs(self): return self._sub_tlvs def __eq__(self, other): common.expect_the_same_class(self, other) return self.sub_tlvs == other.sub_tlvs def __repr__(self): sub_tlvs_str = ", ".join(["{}".format(tlv) for tlv in self._sub_tlvs]) return "CommissioningData(stable={}, sub_tlvs=[{}])".format(self._stable, sub_tlvs_str) class CommissioningDataSubTlvsFactory(SubTlvsFactory): def __init__(self, sub_tlvs_factories): super(CommissioningDataSubTlvsFactory, self).__init__(sub_tlvs_factories) class CommissioningDataFactory(object): def __init__(self, sub_tlvs_factory): self._sub_tlvs_factory = sub_tlvs_factory def parse(self, data, message_info): sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info) return CommissioningData(sub_tlvs, message_info.stable) class Service(NetworkData): def __init__( self, t, _id, enterprise_number, service_data_length, service_data, sub_tlvs, stable, ): super(Service, self).__init__(stable) self._t = t self._id = _id self._enterprise_number = enterprise_number self._service_data_length = service_data_length self._service_data = service_data self._sub_tlvs = sub_tlvs @property def t(self): return self._t @property def id(self): return self._id @property def enterprise_number(self): return self._enterprise_number @property def service_data_length(self): return self._service_data_length @property def service_data(self): return self._service_data @property def sub_tlvs(self): return self._sub_tlvs def __eq__(self, other): common.expect_the_same_class(self, other) return (self.t == other.t and self.id == other.id and self.enterprise_number == other.enterprise_number and self.service_data_length == other.service_data_length and self.service_data == other.service_data and self.sub_tlvs == other.sub_tlvs) def __repr__(self): sub_tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.sub_tlvs]) return ( "LowpanId(stable={}, t={}, id={}, enterprise_number={}, service_data_length={}, service_data={}, sub_tlvs=[{}])" ).format( self.stable, self.t, self.id, self.enterprise_number, self.service_data_length, self.service_data, sub_tlvs_str, ) class ServiceSubTlvsFactory(NetworkDataSubTlvsFactory): def __init__(self, sub_tlvs_factories): super(ServiceSubTlvsFactory, self).__init__(sub_tlvs_factories) class ServiceFactory(object): def __init__(self, sub_tlvs_factory): self._sub_tlvs_factory = sub_tlvs_factory def parse(self, data, message_info): data_byte = ord(data.read(1)) t = (data_byte >> 7) & 0x01 _id = data_byte & 0x0F enterprise_number = struct.unpack(">L", data.read(4))[0] service_data_length = ord(data.read(1)) service_data = data.read(service_data_length) sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info) return Service( t, _id, enterprise_number, service_data_length, service_data, sub_tlvs, message_info.stable, ) class Server(NetworkData): def __init__(self, server_16, server_data, stable): super(Server, self).__init__(stable) self._server_16 = server_16 self._server_data = server_data @property def server_16(self): return self._server_16 @property def server_data(self): return self._server_data def __eq__(self, other): common.expect_the_same_class(self, other) return (self.server_16 == other.server_16 and self.server_data == other.server_data) def __repr__(self): return "LowpanId(stable={}, server_16={}, server_data=b'{}')".format(self.stable, self.server_16, hexlify(self.server_data)) class ServerFactory(object): def parse(self, data, message_info): server_16 = struct.unpack(">H", data.read(2))[0] server_data = bytearray(data.read()) return Server(server_16, server_data, message_info.stable) class NetworkDataTlvsFactory(NetworkDataSubTlvsFactory): def __init__(self, sub_tlvs_factories): super(NetworkDataTlvsFactory, self).__init__(sub_tlvs_factories)