(CC'ed the author)
I am going to add the default args to each constructor of the classes of
the packet libraries now. When length parameter is omitted, it is due to
make it calculated automatically. Since nd_router_advert class can have
multiple options, each option should calculate length automatically.
However, the args of the current nd_router_advert class consist of the
list of type, length, and data, so it is hard to calculate the length
for every option.
e.g.)
ra = icmpv6.nd_router_advert(
0, 0, 0, 0, 0,
[1, 3],
[1, 4], # <- lengths of each option
[nd_option_la('00:00:00:00:00:00'), nd_option_pi(...)])
Therefore, I would like to change them as follows:
e.g.)
ra = icmpv6.nd_router_advert(
0, 0, 0, 0, 0,
[nd_option_la(1, 3, '00:00:00:00:00:00'),
nd_option_pi(3, 4, ...)])
In fact, the type is fixed for every class and the length is
automatically calculated, so it is as follows.
e.g.)
ra = icmpv6.nd_router_advert(
0, 0, 0, 0, 0,
[nd_option_sla('00:00:00:00:00:00'),
nd_option_pi(...)])
Source code will be changed sharply.
Any comments?
Signed-off-by: Yuichi Ito <[email protected]>
---
ryu/lib/packet/icmpv6.py | 393 +++++++++++++++++++---------------
ryu/tests/unit/packet/test_icmpv6.py | 134 ++++++++----
2 files changed, 309 insertions(+), 218 deletions(-)
diff --git a/ryu/lib/packet/icmpv6.py b/ryu/lib/packet/icmpv6.py
index 4da7fa2..f037e8d 100644
--- a/ryu/lib/packet/icmpv6.py
+++ b/ryu/lib/packet/icmpv6.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
import struct
import sys
import array
@@ -56,6 +57,13 @@ ICMPV6_NI_REPLY = 140 # node information reply
ICMPV6_MAXTYPE = 201
+# ND_OPTIONS from RFC 4861
+ND_OPTION_SLA = 1 # Source Link-Layer Address
+ND_OPTION_TLA = 2 # Target Link-Layer Address
+ND_OPTION_PI = 3 # Prefix Information
+ND_OPTION_RH = 4 # Redirected Header
+ND_OPTION_MTU = 5 # MTU
+
class icmpv6(packet_base.PacketBase):
"""ICMPv6 (RFC 2463) header encoder/decoder class.
@@ -150,14 +158,8 @@ class nd_neighbor(stringify.StringifyMixin):
res R,S,O Flags for Neighbor Advertisement. \
The 3 MSBs of "Reserved" field for Neighbor Solicitation.
dst Target Address
- type\_ "Type" field of the first option. None if no options. \
- NOTE: This implementation doesn't support two or more \
- options.
- length "Length" field of the first option. None if no options.
- data An object to describe the first option. \
- None if no options. \
- Either ryu.lib.packet.icmpv6.nd_option_la object \
- or a bytearray.
+ option a derived object of ryu.lib.packet.icmpv6.nd_option \
+ or a bytearray. None if no options.
============== ====================
"""
@@ -165,56 +167,44 @@ class nd_neighbor(stringify.StringifyMixin):
_MIN_LEN = struct.calcsize(_PACK_STR)
_ND_OPTION_TYPES = {}
- # ND option type
- ND_OPTION_SLA = 1 # Source Link-Layer Address
- ND_OPTION_TLA = 2 # Target Link-Layer Address
- ND_OPTION_PI = 3 # Prefix Information
- ND_OPTION_RH = 4 # Redirected Header
- ND_OPTION_MTU = 5 # MTU
-
@staticmethod
def register_nd_option_type(*args):
def _register_nd_option_type(cls):
- for type_ in args:
- nd_neighbor._ND_OPTION_TYPES[type_] = cls
+ nd_neighbor._ND_OPTION_TYPES[cls.option_type()] = cls
return cls
- return _register_nd_option_type
+ return _register_nd_option_type(args[0])
- def __init__(self, res, dst, type_=None, length=None, data=None):
- self.res = res << 29
+ def __init__(self, res, dst, option=None):
+ self.res = res
self.dst = dst
- self.type_ = type_
- self.length = length
- self.data = data
+ self.option = option
@classmethod
def parser(cls, buf, offset):
(res, dst) = struct.unpack_from(cls._PACK_STR, buf, offset)
- msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst))
offset += cls._MIN_LEN
+ option = None
if len(buf) > offset:
- (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
- cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
- offset += 2
- if cls_:
- msg.data = cls_.parser(buf, offset)
+ (type_, ) = struct.unpack_from('!B', buf, offset)
+ cls_ = cls._ND_OPTION_TYPES.get(type_)
+ if cls_ is not None:
+ option = cls_.parser(buf, offset)
else:
- msg.data = buf[offset:]
-
+ option = buf[offset:]
+ msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst), option)
return msg
def serialize(self):
- hdr = bytearray(struct.pack(nd_neighbor._PACK_STR, self.res,
- addrconv.ipv6.text_to_bin(self.dst)))
-
- if self.type_ is not None:
- hdr += bytearray(struct.pack('!BB', self.type_, self.length))
- if self.type_ in nd_neighbor._ND_OPTION_TYPES:
- hdr += self.data.serialize()
- elif self.data is not None:
- hdr += bytearray(self.data)
-
- return hdr
+ res = self.res << 29
+ hdr = bytearray(struct.pack(
+ nd_neighbor._PACK_STR, res,
+ addrconv.ipv6.text_to_bin(self.dst)))
+ if self.option is not None:
+ if isinstance(self.option, nd_option):
+ hdr.extend(self.option.serialize())
+ else:
+ hdr.extend(self.option)
+ return str(hdr)
@icmpv6.register_icmpv6_type(ND_ROUTER_SOLICIT)
@@ -234,14 +224,8 @@ class nd_router_solicit(stringify.StringifyMixin):
Attribute Description
============== ====================
res This field is unused. It MUST be initialized to zero.
- type\_ "Type" field of the first option. None if no options. \
- NOTE: This implementation doesn't support two or more \
- options.
- length "Length" field of the first option. None if no options.
- data An object to describe the first option. \
- None if no options. \
- Either ryu.lib.packet.icmpv6.nd_option_la object \
- or a bytearray.
+ option a derived object of ryu.lib.packet.icmpv6.nd_option \
+ or a bytearray. None if no options.
============== ====================
"""
@@ -249,50 +233,41 @@ class nd_router_solicit(stringify.StringifyMixin):
_MIN_LEN = struct.calcsize(_PACK_STR)
_ND_OPTION_TYPES = {}
- # ND option type
- ND_OPTION_SLA = 1 # Source Link-Layer Address
-
@staticmethod
def register_nd_option_type(*args):
def _register_nd_option_type(cls):
- for type_ in args:
- nd_router_solicit._ND_OPTION_TYPES[type_] = cls
+ nd_router_solicit._ND_OPTION_TYPES[cls.option_type()] = cls
return cls
- return _register_nd_option_type
+ return _register_nd_option_type(args[0])
- def __init__(self, res, type_=None, length=None, data=None):
+ def __init__(self, res=0, option=None):
self.res = res
- self.type_ = type_
- self.length = length
- self.data = data
+ self.option = option
@classmethod
def parser(cls, buf, offset):
- res = struct.unpack_from(cls._PACK_STR, buf, offset)
- msg = cls(res)
+ (res, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
offset += cls._MIN_LEN
+ option = None
if len(buf) > offset:
- (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
- cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
- offset += 2
- if cls_:
- msg.data = cls_.parser(buf, offset)
+ (type_, ) = struct.unpack_from('!B', buf, offset)
+ cls_ = cls._ND_OPTION_TYPES.get(type_)
+ if cls_ is not None:
+ option = cls_.parser(buf, offset)
else:
- msg.data = buf[offset:]
-
+ option = buf[offset:]
+ msg = cls(res, option)
return msg
def serialize(self):
- hdr = bytearray(struct.pack(nd_router_solicit._PACK_STR, self.res))
-
- if self.type_ is not None:
- hdr += bytearray(struct.pack('!BB', self.type_, self.length))
- if self.type_ in nd_router_solicit._ND_OPTION_TYPES:
- hdr += self.data.serialize()
- elif self.data is not None:
- hdr += bytearray(self.data)
-
- return hdr
+ hdr = bytearray(struct.pack(
+ nd_router_solicit._PACK_STR, self.res))
+ if self.option is not None:
+ if isinstance(self.option, nd_option):
+ hdr.extend(self.option.serialize())
+ else:
+ hdr.extend(self.option)
+ return str(hdr)
@icmpv6.register_icmpv6_type(ND_ROUTER_ADVERT)
@@ -316,17 +291,9 @@ class nd_router_advert(stringify.StringifyMixin):
rou_l Router Lifetime.
rea_t Reachable Time.
ret_t Retrans Timer.
- type\_ List of option type. Each index refers to an option. \
- None if no options. \
- NOTE: This implementation support one or more \
- options.
- length List of option length. Each index refers to an option. \
- None if no options. \
- data List of option data. Each index refers to an option. \
- None if no options. \
- ryu.lib.packet.icmpv6.nd_option_la object, \
- ryu.lib.packet.icmpv6.nd_option_pi object \
- or a bytearray.
+ options List of a derived object of \
+ ryu.lib.packet.icmpv6.nd_option or a bytearray. \
+ None if no options.
============== ====================
"""
@@ -334,79 +301,126 @@ class nd_router_advert(stringify.StringifyMixin):
_MIN_LEN = struct.calcsize(_PACK_STR)
_ND_OPTION_TYPES = {}
- # ND option type
- ND_OPTION_SLA = 1 # Source Link-Layer Address
- ND_OPTION_PI = 3 # Prefix Information
- ND_OPTION_MTU = 5 # MTU
-
@staticmethod
def register_nd_option_type(*args):
def _register_nd_option_type(cls):
- for type_ in args:
- nd_router_advert._ND_OPTION_TYPES[type_] = cls
+ nd_router_advert._ND_OPTION_TYPES[cls.option_type()] = cls
return cls
- return _register_nd_option_type
+ return _register_nd_option_type(args[0])
- def __init__(self, ch_l, res, rou_l, rea_t, ret_t, type_=None, length=None,
- data=None):
+ def __init__(self, ch_l=0, res=0, rou_l=0, rea_t=0, ret_t=0, options=None):
self.ch_l = ch_l
- self.res = res << 6
+ self.res = res
self.rou_l = rou_l
self.rea_t = rea_t
self.ret_t = ret_t
- self.type_ = type_
- self.length = length
- self.data = data
+ options = options or []
+ assert isinstance(options, list)
+ self.options = options
@classmethod
def parser(cls, buf, offset):
- (ch_l, res, rou_l, rea_t, ret_t) = struct.unpack_from(cls._PACK_STR,
- buf, offset)
- msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t)
+ (ch_l, res, rou_l, rea_t, ret_t
+ ) = struct.unpack_from(cls._PACK_STR, buf, offset)
offset += cls._MIN_LEN
- msg.type_ = list()
- msg.length = list()
- msg.data = list()
+ options = []
while len(buf) > offset:
- (type_, length) = struct.unpack_from('!BB', buf, offset)
- msg.type_.append(type_)
- msg.length.append(length)
- cls_ = cls._ND_OPTION_TYPES.get(type_, None)
- offset += 2
- byte_len = length * 8 - 2
- if cls_:
- msg.data.append(cls_.parser(buf[:offset+cls_._MIN_LEN],
- offset))
- offset += cls_._MIN_LEN
+ (type_, ) = struct.unpack_from('!B', buf, offset)
+ cls_ = cls._ND_OPTION_TYPES.get(type_)
+ if cls_ is not None:
+ option = cls_.parser(buf, offset)
else:
- msg.data.append(buf[offset:offset + byte_len])
- offset += byte_len
-
+ option = buf[offset:]
+ options.append(option)
+ offset += len(option)
+ msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t, options)
return msg
def serialize(self):
- hdr = bytearray(struct.pack(nd_router_advert._PACK_STR, self.ch_l,
- self.res, self.rou_l, self.rea_t,
- self.ret_t))
- if self.type_ is not None:
- for i in range(len(self.type_)):
- hdr += bytearray(struct.pack('!BB', self.type_[i],
- self.length[i]))
- if self.type_[i] in nd_router_advert._ND_OPTION_TYPES:
- hdr += self.data[i].serialize()
- elif self.data[i] is not None:
- hdr += bytearray(self.data[i])
+ res = self.res << 6
+ hdr = bytearray(struct.pack(
+ nd_router_advert._PACK_STR, self.ch_l, res, self.rou_l,
+ self.rea_t, self.ret_t))
+ for option in self.options:
+ if isinstance(option, nd_option):
+ hdr.extend(option.serialize())
+ else:
+ hdr.extend(option)
+ return str(hdr)
- return hdr
+class nd_option(stringify.StringifyMixin):
+
+ __metaclass__ = abc.ABCMeta
-@nd_neighbor.register_nd_option_type(nd_neighbor.ND_OPTION_SLA,
- nd_neighbor.ND_OPTION_TLA)
-@nd_router_solicit.register_nd_option_type(nd_router_solicit.ND_OPTION_SLA)
-@nd_router_advert.register_nd_option_type(nd_router_advert.ND_OPTION_SLA)
-class nd_option_la(stringify.StringifyMixin):
+ @classmethod
+ @abc.abstractmethod
+ def option_type(cls):
+ pass
+
+ @abc.abstractmethod
+ def __init__(self, _type, length):
+ self._type = _type
+ self.length = length
+
+ @classmethod
+ @abc.abstractmethod
+ def parser(cls, buf):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self):
+ pass
+
+
+class nd_option_la(nd_option):
+
+ _PACK_STR = '!BB6s'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ @abc.abstractmethod
+ def __init__(self, length, hw_src, data):
+ super(nd_option_la, self).__init__(self.option_type(), length)
+ self.hw_src = hw_src
+ self.data = data
+
+ @classmethod
+ def parser(cls, buf, offset):
+ (_, length, hw_src) = struct.unpack_from(cls._PACK_STR, buf, offset)
+ msg = cls(length, addrconv.mac.bin_to_text(hw_src))
+ offset += cls._MIN_LEN
+ if len(buf) > offset:
+ msg.data = buf[offset:]
+
+ return msg
+
+ def serialize(self):
+ buf = bytearray(struct.pack(
+ self._PACK_STR, self.option_type(), self.length,
+ addrconv.mac.text_to_bin(self.hw_src)))
+ if self.data is not None:
+ buf.extend(self.data)
+ mod = len(buf) % 8
+ if mod:
+ buf.extend(bytearray(8 - mod))
+ if 0 == self.length:
+ self.length = len(buf) / 8
+ struct.pack_into('!B', buf, 1, self.length)
+ return str(buf)
+
+ def __len__(self):
+ length = self._MIN_LEN
+ if self.data is not None:
+ length += len(self.data)
+ return length
+
+
+@nd_neighbor.register_nd_option_type
+@nd_router_solicit.register_nd_option_type
+@nd_router_advert.register_nd_option_type
+class nd_option_sla(nd_option_la):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
- Source/Target Link-Layer Address Option. (RFC 4861)
+ Source Link-Layer Address Option. (RFC 4861)
This is used with ryu.lib.packet.icmpv6.nd_neighbor,
ryu.lib.packet.icmpv6.nd_router_solicit or
@@ -421,6 +435,8 @@ class nd_option_la(stringify.StringifyMixin):
============== ====================
Attribute Description
============== ====================
+ length length of the option. \
+ (0 means automatically-calculate when encoding)
hw_src Link-Layer Address. \
NOTE: If the address is longer than 6 octets this contains \
the first 6 octets in the address. \
@@ -433,35 +449,54 @@ class nd_option_la(stringify.StringifyMixin):
============== ====================
"""
- _PACK_STR = '!6s'
- _MIN_LEN = struct.calcsize(_PACK_STR)
+ @classmethod
+ def option_type(cls):
+ return ND_OPTION_SLA
- def __init__(self, hw_src, data=None):
- self.hw_src = hw_src
- self.data = data
+ def __init__(self, length=0, hw_src='00:00:00:00:00:00', data=None):
+ super(nd_option_sla, self).__init__(length, hw_src, data)
- @classmethod
- def parser(cls, buf, offset):
- (hw_src, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
- msg = cls(addrconv.mac.bin_to_text(hw_src))
- offset += cls._MIN_LEN
- if len(buf) > offset:
- msg.data = buf[offset:]
- return msg
+@nd_neighbor.register_nd_option_type
+class nd_option_tla(nd_option_la):
+ """ICMPv6 sub encoder/decoder class for Neighbor discovery
+ Target Link-Layer Address Option. (RFC 4861)
- def serialize(self):
- hdr = bytearray(struct.pack(self._PACK_STR,
- addrconv.mac.text_to_bin(self.hw_src)))
+ This is used with ryu.lib.packet.icmpv6.nd_neighbor.
- if self.data is not None:
- hdr += bytearray(self.data)
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
- return hdr
+ .. tabularcolumns:: |l|p{35em}|
+ ============== ====================
+ Attribute Description
+ ============== ====================
+ length length of the option. \
+ (0 means automatically-calculate when encoding)
+ hw_src Link-Layer Address. \
+ NOTE: If the address is longer than 6 octets this contains \
+ the first 6 octets in the address. \
+ This implementation assumes the address has at least \
+ 6 octets.
+ data A bytearray which contains the rest of Link-Layer Address \
+ and padding. When encoding a packet, it's user's \
+ responsibility to provide necessary padding for 8-octets \
+ alignment required by the protocol.
+ ============== ====================
+ """
+
+ @classmethod
+ def option_type(cls):
+ return ND_OPTION_TLA
-@nd_router_advert.register_nd_option_type(nd_router_advert.ND_OPTION_PI)
-class nd_option_pi(stringify.StringifyMixin):
+ def __init__(self, length=0, hw_src='00:00:00:00:00:00', data=None):
+ super(nd_option_tla, self).__init__(length, hw_src, data)
+
+
+@nd_router_advert.register_nd_option_type
+class nd_option_pi(nd_option):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Prefix Information Option. (RFC 4861)
@@ -476,6 +511,8 @@ class nd_option_pi(stringify.StringifyMixin):
============== ====================
Attribute Description
============== ====================
+ length length of the option. \
+ (0 means automatically-calculate when encoding)
pl Prefix Length.
res1 L,A,R\* Flags for Prefix Information.
val_l Valid Lifetime.
@@ -487,12 +524,18 @@ class nd_option_pi(stringify.StringifyMixin):
\*R flag is defined in (RFC 3775)
"""
- _PACK_STR = '!BBIII16s'
+ _PACK_STR = '!BBBBIII16s'
_MIN_LEN = struct.calcsize(_PACK_STR)
- def __init__(self, pl, res1, val_l, pre_l, res2, prefix):
+ @classmethod
+ def option_type(cls):
+ return ND_OPTION_PI
+
+ def __init__(self, length=0, pl=0, res1=0, val_l=0, pre_l=0, res2=0,
+ prefix='::'):
+ super(nd_option_pi, self).__init__(self.option_type(), length)
self.pl = pl
- self.res1 = res1 << 5
+ self.res1 = res1
self.val_l = val_l
self.pre_l = pre_l
self.res2 = res2
@@ -500,21 +543,23 @@ class nd_option_pi(stringify.StringifyMixin):
@classmethod
def parser(cls, buf, offset):
- (pl, res1, val_l, pre_l, res2, prefix) = struct.unpack_from(cls.
- _PACK_STR,
- buf,
- offset)
+ (_, length, pl, res1, val_l, pre_l, res2, prefix
+ ) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(pl, res1 >> 5, val_l, pre_l, res2,
addrconv.ipv6.bin_to_text(prefix))
return msg
def serialize(self):
- hdr = bytearray(struct.pack(self._PACK_STR, self.pl, self.res1,
- self.val_l, self.pre_l, self.res2,
- addrconv.ipv6.text_to_bin(self.prefix)))
-
- return hdr
+ res1 = self.res1 << 5
+ buf = bytearray(struct.pack(
+ self._PACK_STR, self.option_type(), self.length, self.pl,
+ res1, self.val_l, self.pre_l, self.res2,
+ addrconv.ipv6.text_to_bin(self.prefix)))
+ if 0 == self.length:
+ self.length = len(buf) / 8
+ struct.pack_into('!B', buf, 1, self.length)
+ return str(buf)
@icmpv6.register_icmpv6_type(ICMPV6_ECHO_REPLY, ICMPV6_ECHO_REQUEST)
diff --git a/ryu/tests/unit/packet/test_icmpv6.py
b/ryu/tests/unit/packet/test_icmpv6.py
index 0a33ca0..da2e1fa 100644
--- a/ryu/tests/unit/packet/test_icmpv6.py
+++ b/ryu/tests/unit/packet/test_icmpv6.py
@@ -190,7 +190,7 @@ class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
self.buf = '\x81\x00\xa4\x72\x76\x20\x00\x00'
-class Test_icmpv6_neighbor_solict(unittest.TestCase):
+class Test_icmpv6_neighbor_solicit(unittest.TestCase):
type_ = 135
code = 0
csum = 0x952d
@@ -214,11 +214,9 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
def test_init(self):
nd = icmpv6.nd_neighbor(self.res, self.dst)
- eq_(nd.res >> 29, self.res)
+ eq_(nd.res, self.res)
eq_(nd.dst, self.dst)
- eq_(nd.type_, None)
- eq_(nd.length, None)
- eq_(nd.data, None)
+ eq_(nd.option, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@@ -227,16 +225,16 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(msg.type_, self.type_)
eq_(msg.code, self.code)
eq_(msg.csum, self.csum)
- eq_(msg.data.res >> 29, self.res)
+ eq_(msg.data.res, self.res)
eq_(addrconv.ipv6.text_to_bin(msg.data.dst),
addrconv.ipv6.text_to_bin(self.dst))
eq_(n, None)
if data:
- nd = msg.data
- eq_(nd.type_, self.nd_type)
+ nd = msg.data.option
+ LOG.info(nd)
eq_(nd.length, self.nd_length)
- eq_(nd.data.hw_src, self.nd_hw_src)
- eq_(nd.data.data, None)
+ eq_(nd.hw_src, self.nd_hw_src)
+ eq_(nd.data, None)
def test_parser_without_data(self):
self._test_parser()
@@ -264,9 +262,8 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(data, '')
def test_serialize_with_data(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- nd = icmpv6.nd_neighbor(
- self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
+ nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
nd_csum = icmpv6_csum(prev, self.buf + self.data)
@@ -276,7 +273,7 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
(res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
- '!BB6s', buf, icmp._MIN_LEN + nd._MIN_LEN)
+ nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
eq_(type_, self.type_)
@@ -289,23 +286,21 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- nd = icmpv6.nd_neighbor(
- self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
+ nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
- nd_opt_values = {'hw_src': self.nd_hw_src,
+ nd_opt_values = {'length': self.nd_length,
+ 'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
if k in nd_opt_values])
- nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
+ nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
nd_values = {'res': repr(nd.res),
'dst': repr(self.dst),
- 'type_': repr(self.nd_type),
- 'length': repr(self.nd_length),
- 'data': nd_opt_str}
+ 'option': nd_opt_str}
_nd_str = ','.join(['%s=%s' % (k, nd_values[k])
for k, v in inspect.getmembers(nd)
if k in nd_values])
@@ -324,7 +319,7 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(repr(ic), ic_str)
-class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict):
+class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solicit):
def setUp(self):
self.type_ = 136
self.csum = 0xb8ba
@@ -339,8 +334,65 @@ class
Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict):
+ '\x3f\xfe\x05\x07\x00\x00\x00\x01' \
+ '\x02\x60\x97\xff\xfe\x07\x69\xea'
+ def test_serialize_with_data(self):
+ nd_opt = icmpv6.nd_option_tla(self.nd_length, self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
+ prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
+ nd_csum = icmpv6_csum(prev, self.buf + self.data)
+
+ icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
+ buf = buffer(icmp.serialize(bytearray(), prev))
+
+ (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
+ (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
+ (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
+ nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
+ data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
+
+ eq_(type_, self.type_)
+ eq_(code, self.code)
+ eq_(csum, nd_csum)
+ eq_(res >> 29, self.res)
+ eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
+ eq_(nd_type, self.nd_type)
+ eq_(nd_length, self.nd_length)
+ eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
+
+ def test_to_string(self):
+ nd_opt = icmpv6.nd_option_tla(self.nd_length, self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
+
+ nd_opt_values = {'length': self.nd_length,
+ 'hw_src': self.nd_hw_src,
+ 'data': None}
+ _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
+ for k, v in inspect.getmembers(nd_opt)
+ if k in nd_opt_values])
+ nd_opt_str = '%s(%s)' % (icmpv6.nd_option_tla.__name__, _nd_opt_str)
+
+ nd_values = {'res': repr(nd.res),
+ 'dst': repr(self.dst),
+ 'option': nd_opt_str}
+ _nd_str = ','.join(['%s=%s' % (k, nd_values[k])
+ for k, v in inspect.getmembers(nd)
+ if k in nd_values])
+ nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
+
+ icmp_values = {'type_': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': nd_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
+
-class Test_icmpv6_router_solict(unittest.TestCase):
+class Test_icmpv6_router_solicit(unittest.TestCase):
type_ = 133
code = 0
csum = 0x97d9
@@ -362,9 +414,7 @@ class Test_icmpv6_router_solict(unittest.TestCase):
def test_init(self):
rs = icmpv6.nd_router_solicit(self.res)
eq_(rs.res, self.res)
- eq_(rs.type_, None)
- eq_(rs.length, None)
- eq_(rs.data, None)
+ eq_(rs.option, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@@ -374,14 +424,13 @@ class Test_icmpv6_router_solict(unittest.TestCase):
eq_(msg.code, self.code)
eq_(msg.csum, self.csum)
if data is not None:
- eq_(msg.data.res[0], self.res)
+ eq_(msg.data.res, self.res)
eq_(n, None)
if data:
- rs = msg.data
- eq_(rs.type_, self.nd_type)
+ rs = msg.data.option
eq_(rs.length, self.nd_length)
- eq_(rs.data.hw_src, self.nd_hw_src)
- eq_(rs.data.data, None)
+ eq_(rs.hw_src, self.nd_hw_src)
+ eq_(rs.data, None)
def test_parser_without_data(self):
self._test_parser()
@@ -408,9 +457,8 @@ class Test_icmpv6_router_solict(unittest.TestCase):
eq_(data, '')
def test_serialize_with_data(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- rs = icmpv6.nd_router_solicit(self.res, self.nd_type, self.nd_length,
- nd_opt)
+ nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
+ rs = icmpv6.nd_router_solicit(self.res, nd_opt)
prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6)
rs_csum = icmpv6_csum(prev, self.buf + self.data)
@@ -420,7 +468,7 @@ class Test_icmpv6_router_solict(unittest.TestCase):
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
- '!BB6s', buf, icmp._MIN_LEN + rs._MIN_LEN)
+ nd_opt._PACK_STR, buf, icmp._MIN_LEN + rs._MIN_LEN)
data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):]
eq_(type_, self.type_)
@@ -432,22 +480,20 @@ class Test_icmpv6_router_solict(unittest.TestCase):
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- rs = icmpv6.nd_router_solicit(
- self.res, self.nd_type, self.nd_length, nd_opt)
+ nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
+ rs = icmpv6.nd_router_solicit(self.res, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
- nd_opt_values = {'hw_src': self.nd_hw_src,
+ nd_opt_values = {'length': self.nd_length,
+ 'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
if k in nd_opt_values])
- nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
+ nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
rs_values = {'res': repr(rs.res),
- 'type_': repr(self.nd_type),
- 'length': repr(self.nd_length),
- 'data': nd_opt_str}
+ 'option': nd_opt_str}
_rs_str = ','.join(['%s=%s' % (k, rs_values[k])
for k, v in inspect.getmembers(rs)
if k in rs_values])
--
1.7.10.4
------------------------------------------------------------------------------
November Webinars for C, C++, Fortran Developers
Accelerate application performance with scalable programming models. Explore
techniques for threading, error checking, porting, and tuning. Get the most
from the latest Intel processors and coprocessors. See abstracts and register
http://pubads.g.doubleclick.net/gampad/clk?id=60136231&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel