Signed-off-by: Yuichi Ito <[email protected]>
---
ryu/lib/packet/ipv6.py | 356 +++++++++++-
ryu/tests/unit/packet/test_ipv6.py | 1041 +++++++++++++++++++++++++++++++++++-
2 files changed, 1366 insertions(+), 31 deletions(-)
diff --git a/ryu/lib/packet/ipv6.py b/ryu/lib/packet/ipv6.py
index 325f834..de23589 100644
--- a/ryu/lib/packet/ipv6.py
+++ b/ryu/lib/packet/ipv6.py
@@ -13,14 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
import struct
-import socket
-from . import packet_base
-from . import packet_utils
-from . import icmpv6
-from . import tcp
-from ryu.ofproto import inet
+
from ryu.lib import addrconv
+from ryu.lib import stringify
+from ryu.lib.packet import icmpv6
+from ryu.lib.packet import packet_base
+from ryu.lib.packet import tcp
+from ryu.ofproto import inet
IPV6_ADDRESS_PACK_STR = '!16s'
@@ -51,14 +52,23 @@ class ipv6(packet_base.PacketBase):
hop_limit Hop Limit
src Source Address 'ff02::1'
dst Destination Address '::'
+ ext_hdrs Extension Headers
============== ======================================== ==================
"""
_PACK_STR = '!IHBB16s16s'
_MIN_LEN = struct.calcsize(_PACK_STR)
+ _IPV6_EXT_HEADER_TYPE = {}
+
+ @staticmethod
+ def register_header_type(type_):
+ def _register_header_type(cls):
+ ipv6._IPV6_EXT_HEADER_TYPE[type_] = cls
+ return cls
+ return _register_header_type
def __init__(self, version, traffic_class, flow_label, payload_length,
- nxt, hop_limit, src, dst):
+ nxt, hop_limit, src, dst, ext_hdrs=[]):
super(ipv6, self).__init__()
self.version = version
self.traffic_class = traffic_class
@@ -68,20 +78,45 @@ class ipv6(packet_base.PacketBase):
self.hop_limit = hop_limit
self.src = src
self.dst = dst
+ if ext_hdrs:
+ assert isinstance(ext_hdrs, list)
+ last_hdr = None
+ for ext_hdr in ext_hdrs:
+ assert isinstance(ext_hdr, header)
+ if last_hdr:
+ ext_hdr.set_nxt(last_hdr.nxt)
+ last_hdr.nxt = ext_hdr.TYPE
+ else:
+ ext_hdr.set_nxt(self.nxt)
+ self.nxt = ext_hdr.TYPE
+ last_hdr = ext_hdr
+ self.ext_hdrs = ext_hdrs
@classmethod
def parser(cls, buf):
- (v_tc_flow, payload_length, nxt, hlim, src, dst) = struct.unpack_from(
- cls._PACK_STR, buf)
+ (v_tc_flow, payload_length, nxt, hop_limit, src,
+ dst) = struct.unpack_from(cls._PACK_STR, buf)
version = v_tc_flow >> 28
traffic_class = (v_tc_flow >> 20) & 0xff
flow_label = v_tc_flow & 0xfffff
- hop_limit = hlim
+ offset = cls._MIN_LEN
+ last = nxt
+ ext_hdrs = []
+ while True:
+ cls_ = cls._IPV6_EXT_HEADER_TYPE.get(last)
+ if not cls_:
+ break
+ hdr = cls_.parser(buf[offset:])
+ ext_hdrs.append(hdr)
+ offset += len(hdr)
+ last = hdr.nxt
+ # call ipv6.__init__() using 'nxt' of the last extension
+ # header that points the next protocol.
msg = cls(version, traffic_class, flow_label, payload_length,
- nxt, hop_limit, addrconv.ipv6.bin_to_text(src),
- addrconv.ipv6.bin_to_text(dst))
- return (msg, ipv6.get_packet_type(nxt),
- buf[cls._MIN_LEN:cls._MIN_LEN+payload_length])
+ last, hop_limit, addrconv.ipv6.bin_to_text(src),
+ addrconv.ipv6.bin_to_text(dst), ext_hdrs)
+ return (msg, ipv6.get_packet_type(last),
+ buf[offset:offset+payload_length])
def serialize(self, payload, prev):
hdr = bytearray(40)
@@ -91,7 +126,300 @@ class ipv6(packet_base.PacketBase):
self.payload_length, self.nxt, self.hop_limit,
addrconv.ipv6.text_to_bin(self.src),
addrconv.ipv6.text_to_bin(self.dst))
+ if self.ext_hdrs:
+ for ext_hdr in self.ext_hdrs:
+ hdr.extend(ext_hdr.serialize())
return hdr
ipv6.register_packet_type(icmpv6.icmpv6, inet.IPPROTO_ICMPV6)
ipv6.register_packet_type(tcp.tcp, inet.IPPROTO_TCP)
+
+
+class header(stringify.StringifyMixin):
+ """extension header abstract class."""
+
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self):
+ self.nxt = None
+
+ def set_nxt(self, nxt):
+ self.nxt = nxt
+
+ @classmethod
+ @abc.abstractmethod
+ def parser(cls, buf):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self):
+ pass
+
+ @abc.abstractmethod
+ def __len__(self):
+ pass
+
+
[email protected]_header_type(inet.IPPROTO_HOPOPTS)
+class hop_opts(header):
+ """IPv6 (RFC 2460) Hop-by-Hop Options header encoder/decoder class.
+
+ This is used with ryu.lib.packet.ipv6.ipv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the correspondig args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ size the length of the Hop-by-Hop Options header,
+ not include the first 8 octet.
+ data IPv6 options.
+ ============== =======================================
+ """
+ TYPE = inet.IPPROTO_HOPOPTS
+
+ _PACK_STR = '!BB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _FIX_SIZE = 8
+
+ def __init__(self, size, data):
+ super(hop_opts, self).__init__()
+ assert not (size % 8)
+ self.size = size
+ self.data = data
+
+ @classmethod
+ def parser(cls, buf):
+ (nxt, len_) = struct.unpack_from(cls._PACK_STR, buf)
+ data_len = cls._FIX_SIZE + int(len_)
+ data = []
+ size = cls._MIN_LEN
+ while size < data_len:
+ (type_, ) = struct.unpack_from('!B', buf[size:])
+ if type_ == 0:
+ opt = option(type_, -1, None)
+ size += 1
+ else:
+ opt = option.parser(buf[size:])
+ size += len(opt)
+ data.append(opt)
+ ret = cls(len_, data)
+ ret.set_nxt(nxt)
+ return ret
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR, self.nxt, self.size)
+ buf = bytearray(buf)
+ for opt in self.data:
+ buf.extend(opt.serialize())
+ return buf
+
+ def __len__(self):
+ return self._FIX_SIZE + self.size
+
+
[email protected]_header_type(inet.IPPROTO_DSTOPTS)
+class dst_opts(hop_opts):
+ """IPv6 (RFC 2460) destination header encoder/decoder class.
+
+ This is used with ryu.lib.packet.ipv6.ipv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the correspondig args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ size the length of the destination header,
+ not include the first 8 octet.
+ data IPv6 options.
+ ============== =======================================
+ """
+ TYPE = inet.IPPROTO_DSTOPTS
+
+ def __init__(self, size, data):
+ super(dst_opts, self).__init__(size, data)
+
+
+class option(stringify.StringifyMixin):
+ """IPv6 (RFC 2460) Options header encoder/decoder class.
+
+ This is used with ryu.lib.packet.ipv6.hop_opts or
+ ryu.lib.packet.ipv6.dst_opts.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the correspondig args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ type\_ option type.
+ len\_ the length of data. -1 if type\_ is 0.
+ data an option value. None if len\_ is 0 or -1.
+ ============== =======================================
+ """
+
+ _PACK_STR = '!BB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, type_, len_, data):
+ self.type_ = type_
+ self.len_ = len_
+ self.data = data
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, ) = struct.unpack_from('!B', buf)
+ if not type_:
+ cls_ = cls(type_, -1, None)
+ else:
+ data = None
+ (type_, len_) = struct.unpack_from(cls._PACK_STR, buf)
+ if len_:
+ form = "%ds" % len_
+ (data, ) = struct.unpack_from(form, buf, cls._MIN_LEN)
+ cls_ = cls(type_, len_, data)
+ return cls_
+
+ def serialize(self):
+ data = None
+ if not self.type_:
+ data = struct.pack('!B', self.type_)
+ elif not self.len_:
+ data = struct.pack(self._PACK_STR, self.type_, self.len_)
+ else:
+ form = "%ds" % self.len_
+ data = struct.pack(self._PACK_STR + form, self.type_,
+ self.len_, self.data)
+ return data
+
+ def __len__(self):
+ return self._MIN_LEN + self.len_
+
+
+class routing(header):
+ """
+ TODO: implement ths class.
+ """
+ pass
+
+
[email protected]_header_type(inet.IPPROTO_FRAGMENT)
+class fragment(header):
+ """IPv6 (RFC 2460) fragment header encoder/decoder class.
+
+ This is used with ryu.lib.packet.ipv6.ipv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the correspondig args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ offset offset, in 8-octet units, relative to
+ the start of the fragmentable part of
+ the original packet.
+ more 1 means more fragments follow;
+ 0 means last fragment.
+ id\_ packet identification value.
+ ============== =======================================
+ """
+ TYPE = inet.IPPROTO_FRAGMENT
+
+ _PACK_STR = '!BxHI'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, offset, more, id_):
+ super(fragment, self).__init__()
+ self.offset = offset
+ self.more = more
+ self.id_ = id_
+
+ @classmethod
+ def parser(cls, buf):
+ (nxt, off_m, id_) = struct.unpack_from(cls._PACK_STR, buf)
+ offset = off_m >> 3
+ more = off_m & 0x1
+ ret = cls(offset, more, id_)
+ ret.set_nxt(nxt)
+ return ret
+
+ def serialize(self):
+ off_m = (self.offset << 3 | self.more)
+ buf = struct.pack(self._PACK_STR, self.nxt, off_m, self.id_)
+ return buf
+
+ def __len__(self):
+ return self._MIN_LEN
+
+
[email protected]_header_type(inet.IPPROTO_AH)
+class auth(header):
+ """IP Authentication header (RFC 2402) encoder/decoder class.
+
+ This is used with ryu.lib.packet.ipv6.ipv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the correspondig args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ size the length of the Authentication Header
+ in 64-bit words, subtracting 1.
+ spi security parameters index.
+ seq sequence number.
+ data authentication data.
+ ============== =======================================
+ """
+ TYPE = inet.IPPROTO_AH
+
+ _PACK_STR = '!BB2xII'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, size, spi, seq, data):
+ super(auth, self).__init__()
+ self.size = size
+ self.spi = spi
+ self.seq = seq
+ self.data = data
+
+ @classmethod
+ def _get_size(cls, size):
+ return (int(size) - 1) * 8
+
+ @classmethod
+ def parser(cls, buf):
+ (nxt, size, spi, seq) = struct.unpack_from(cls._PACK_STR, buf)
+ form = "%ds" % (cls._get_size(size) - cls._MIN_LEN)
+ (data, ) = struct.unpack_from(form, buf, cls._MIN_LEN)
+ ret = cls(size, spi, seq, data)
+ ret.set_nxt(nxt)
+ return ret
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR, self.nxt, self.size, self.spi,
+ self.seq)
+ buf = bytearray(buf)
+ form = "%ds" % (auth._get_size(self.size) - self._MIN_LEN)
+ buf.extend(struct.pack(form, self.data))
+ return buf
+
+ def __len__(self):
+ return auth._get_size(self.size)
diff --git a/ryu/tests/unit/packet/test_ipv6.py
b/ryu/tests/unit/packet/test_ipv6.py
index 5454fbf..7a32737 100644
--- a/ryu/tests/unit/packet/test_ipv6.py
+++ b/ryu/tests/unit/packet/test_ipv6.py
@@ -17,10 +17,10 @@
import unittest
import logging
import inspect
+import struct
-from nose.tools import *
-from nose.plugins.skip import Skip, SkipTest
-from ryu.lib import ip
+from nose.tools import eq_, raises
+from ryu.lib import addrconv
from ryu.lib.packet import ipv6
@@ -29,37 +29,1044 @@ LOG = logging.getLogger(__name__)
class Test_ipv6(unittest.TestCase):
- version = 6
- traffic_class = 0
- flow_label = 0
- payload_length = 817
- nxt = 6
- hop_limit = 128
- src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3')
- dst = ip.ipv6_to_bin('2001:4860:0:2001::68')
+ def setUp(self):
+ self.version = 6
+ self.traffic_class = 0
+ self.flow_label = 0
+ self.payload_length = 817
+ self.nxt = 6
+ self.hop_limit = 128
+ self.src = '2002:4637:d5d3::4637:d5d3'
+ self.dst = '2001:4860:0:2001::68'
+ self.ext_hdrs = []
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+
+ self.v_tc_flow = (
+ self.version << 28 | self.traffic_class << 20 |
+ self.flow_label << 12)
+ self.form = '!IHBB16s16s'
+ self.buf = struct.pack(
+ self.form, self.v_tc_flow,
+ self.payload_length, self.nxt, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst))
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.traffic_class, self.ip.traffic_class)
+ eq_(self.flow_label, self.ip.flow_label)
+ eq_(self.payload_length, self.ip.payload_length)
+ eq_(self.nxt, self.ip.nxt)
+ eq_(self.hop_limit, self.ip.hop_limit)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+
+ def test_parser(self):
+ _res = self.ip.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
- ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length,
- nxt, hop_limit, src, dst)
+ eq_(self.version, res.version)
+ eq_(self.traffic_class, res.traffic_class)
+ eq_(self.flow_label, res.flow_label)
+ eq_(self.payload_length, res.payload_length)
+ eq_(self.nxt, res.nxt)
+ eq_(self.hop_limit, res.hop_limit)
+ eq_(self.src, res.src)
+ eq_(self.dst, res.dst)
+ eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+
+ res = struct.unpack_from(self.form, str(buf))
+
+ eq_(self.v_tc_flow, res[0])
+ eq_(self.payload_length, res[1])
+ eq_(self.nxt, res[2])
+ eq_(self.hop_limit, res[3])
+ eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
+ eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+
+ def test_to_string(self):
+ ipv6_values = {'version': self.version,
+ 'traffic_class': self.traffic_class,
+ 'flow_label': self.flow_label,
+ 'payload_length': self.payload_length,
+ 'nxt': self.nxt,
+ 'hop_limit': self.hop_limit,
+ 'src': repr(self.src),
+ 'dst': repr(self.dst),
+ 'ext_hdrs': self.ext_hdrs}
+ _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
+ for k, v in inspect.getmembers(self.ip)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ eq_(str(self.ip), ipv6_str)
+ eq_(repr(self.ip), ipv6_str)
+
+
+class Test_ipv6_with_hopopts(unittest.TestCase):
def setUp(self):
+ self.opt1_type = 5
+ self.opt1_len = 2
+ self.opt1_data = '\x00\x00'
+ self.opt2_type = 1
+ self.opt2_len = 0
+ self.opt2_data = None
+ self.options = [
+ ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
+ ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
+ ]
+ self.hop_opts_size = 0
+ self.hop_opts = ipv6.hop_opts(self.hop_opts_size, self.options)
+
+ self.version = 6
+ self.traffic_class = 0
+ self.flow_label = 0
+ self.payload_length = 817 + len(self.hop_opts)
+ self.nxt = 6
+ self.hop_limit = 128
+ self.src = '2002:4637:d5d3::4637:d5d3'
+ self.dst = '2001:4860:0:2001::68'
+ self.ext_hdrs = [self.hop_opts]
+
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+
+ self.v_tc_flow = (
+ self.version << 28 | self.traffic_class << 20 |
+ self.flow_label << 12)
+ self.form = '!IHBB16s16sBBBB2sBB'
+ self.buf = struct.pack(
+ self.form, self.v_tc_flow,
+ self.payload_length, self.hop_opts.TYPE, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst),
+ self.nxt, self.hop_opts_size, self.opt1_type, self.opt1_len,
+ self.opt1_data, self.opt2_type, self.opt2_len)
+
+ def tearDown(self):
pass
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.traffic_class, self.ip.traffic_class)
+ eq_(self.flow_label, self.ip.flow_label)
+ eq_(self.payload_length, self.ip.payload_length)
+ eq_(self.hop_opts.TYPE, self.ip.nxt)
+ eq_(self.hop_limit, self.ip.hop_limit)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+
+ def test_parser(self):
+ _res = self.ip.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+
+ eq_(self.version, res.version)
+ eq_(self.traffic_class, res.traffic_class)
+ eq_(self.flow_label, res.flow_label)
+ eq_(self.payload_length, res.payload_length)
+ eq_(self.hop_opts.TYPE, res.nxt)
+ eq_(self.hop_limit, res.hop_limit)
+ eq_(self.src, res.src)
+ eq_(self.dst, res.dst)
+ eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+
+ res = struct.unpack_from(self.form, str(buf))
+
+ eq_(self.v_tc_flow, res[0])
+ eq_(self.payload_length, res[1])
+ eq_(self.hop_opts.TYPE, res[2])
+ eq_(self.hop_limit, res[3])
+ eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
+ eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+ eq_(self.nxt, res[6])
+ eq_(self.hop_opts_size, res[7])
+ eq_(self.opt1_type, res[8])
+ eq_(self.opt1_len, res[9])
+ eq_(self.opt1_data, res[10])
+ eq_(self.opt2_type, res[11])
+ eq_(self.opt2_len, res[12])
+
+ def test_to_string(self):
+ option_values = ['type_', 'len_', 'data']
+ opt_str_list = []
+ for option in self.options:
+ _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k)))
+ for k, v in inspect.getmembers(option)
+ if k in option_values])
+ opt_str = '%s(%s)' % (ipv6.option.__name__, _opt_str)
+ opt_str_list.append(opt_str)
+ option_str = '[%s]' % ', '.join(opt_str_list)
+ hop_opts_values = {'nxt': self.nxt,
+ 'size': self.hop_opts_size,
+ 'data': option_str}
+ _hop_str = ','.join(['%s=%s' % (k, hop_opts_values[k])
+ for k, v in inspect.getmembers(self.hop_opts)
+ if k in hop_opts_values])
+ hop_str = '%s(%s)' % (ipv6.hop_opts.__name__, _hop_str)
+ head_str = '[%s]' % hop_str
+ ipv6_values = {'version': self.version,
+ 'traffic_class': self.traffic_class,
+ 'flow_label': self.flow_label,
+ 'payload_length': self.payload_length,
+ 'nxt': self.hop_opts.TYPE,
+ 'hop_limit': self.hop_limit,
+ 'src': repr(self.src),
+ 'dst': repr(self.dst),
+ 'ext_hdrs': head_str}
+ _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
+ for k, v in inspect.getmembers(self.ip)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ eq_(str(self.ip), ipv6_str)
+ eq_(repr(self.ip), ipv6_str)
+
+
+class Test_ipv6_with_dstopts(unittest.TestCase):
+
+ def setUp(self):
+ self.opt1_type = 5
+ self.opt1_len = 2
+ self.opt1_data = '\x00\x00'
+ self.opt2_type = 1
+ self.opt2_len = 0
+ self.opt2_data = None
+ self.options = [
+ ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
+ ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
+ ]
+ self.dst_opts_size = 0
+ self.dst_opts = ipv6.dst_opts(self.dst_opts_size, self.options)
+
+ self.version = 6
+ self.traffic_class = 0
+ self.flow_label = 0
+ self.payload_length = 817 + len(self.dst_opts)
+ self.nxt = 6
+ self.hop_limit = 128
+ self.src = '2002:4637:d5d3::4637:d5d3'
+ self.dst = '2001:4860:0:2001::68'
+ self.ext_hdrs = [self.dst_opts]
+
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+
+ self.v_tc_flow = (
+ self.version << 28 | self.traffic_class << 20 |
+ self.flow_label << 12)
+ self.form = '!IHBB16s16sBBBB2sBB'
+ self.buf = struct.pack(
+ self.form, self.v_tc_flow,
+ self.payload_length, self.dst_opts.TYPE, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst),
+ self.nxt, self.dst_opts_size, self.opt1_type, self.opt1_len,
+ self.opt1_data, self.opt2_type, self.opt2_len)
+
def tearDown(self):
pass
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.traffic_class, self.ip.traffic_class)
+ eq_(self.flow_label, self.ip.flow_label)
+ eq_(self.payload_length, self.ip.payload_length)
+ eq_(self.dst_opts.TYPE, self.ip.nxt)
+ eq_(self.hop_limit, self.ip.hop_limit)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+
+ def test_parser(self):
+ _res = self.ip.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+
+ eq_(self.version, res.version)
+ eq_(self.traffic_class, res.traffic_class)
+ eq_(self.flow_label, res.flow_label)
+ eq_(self.payload_length, res.payload_length)
+ eq_(self.dst_opts.TYPE, res.nxt)
+ eq_(self.hop_limit, res.hop_limit)
+ eq_(self.src, res.src)
+ eq_(self.dst, res.dst)
+ eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+
+ res = struct.unpack_from(self.form, str(buf))
+
+ eq_(self.v_tc_flow, res[0])
+ eq_(self.payload_length, res[1])
+ eq_(self.dst_opts.TYPE, res[2])
+ eq_(self.hop_limit, res[3])
+ eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
+ eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+ eq_(self.nxt, res[6])
+ eq_(self.dst_opts_size, res[7])
+ eq_(self.opt1_type, res[8])
+ eq_(self.opt1_len, res[9])
+ eq_(self.opt1_data, res[10])
+ eq_(self.opt2_type, res[11])
+ eq_(self.opt2_len, res[12])
+
def test_to_string(self):
+ option_values = ['type_', 'len_', 'data']
+ opt_str_list = []
+ for option in self.options:
+ _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k)))
+ for k, v in inspect.getmembers(option)
+ if k in option_values])
+ opt_str = '%s(%s)' % (ipv6.option.__name__, _opt_str)
+ opt_str_list.append(opt_str)
+ option_str = '[%s]' % ', '.join(opt_str_list)
+ dst_opts_values = {'nxt': self.nxt,
+ 'size': self.dst_opts_size,
+ 'data': option_str}
+ _dst_str = ','.join(['%s=%s' % (k, dst_opts_values[k])
+ for k, v in inspect.getmembers(self.dst_opts)
+ if k in dst_opts_values])
+ dst_str = '%s(%s)' % (ipv6.dst_opts.__name__, _dst_str)
+ head_str = '[%s]' % dst_str
ipv6_values = {'version': self.version,
'traffic_class': self.traffic_class,
'flow_label': self.flow_label,
'payload_length': self.payload_length,
- 'nxt': self.nxt,
+ 'nxt': self.dst_opts.TYPE,
+ 'hop_limit': self.hop_limit,
+ 'src': repr(self.src),
+ 'dst': repr(self.dst),
+ 'ext_hdrs': head_str}
+ _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
+ for k, v in inspect.getmembers(self.ip)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ eq_(str(self.ip), ipv6_str)
+ eq_(repr(self.ip), ipv6_str)
+
+
+class Test_ipv6_with_fragment(unittest.TestCase):
+
+ def setUp(self):
+ self.nxt = 44
+ self.offset = 50
+ self.more = 1
+ self.id_ = 123
+ self.fragment_offset = 50
+ self.fragment_more = 1
+ self.fragment_id = 123
+ self.fragment = ipv6.fragment(
+ self.fragment_offset, self.fragment_more, self.fragment_id)
+
+ self.version = 6
+ self.traffic_class = 0
+ self.flow_label = 0
+ self.payload_length = 817 + len(self.fragment)
+ self.nxt = 6
+ self.hop_limit = 128
+ self.src = '2002:4637:d5d3::4637:d5d3'
+ self.dst = '2001:4860:0:2001::68'
+ self.ext_hdrs = [self.fragment]
+
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+
+ self.v_tc_flow = (
+ self.version << 28 | self.traffic_class << 20 |
+ self.flow_label << 12)
+ self.fragment_off_m = (
+ self.fragment_offset << 3 | self.fragment_more)
+ self.form = '!IHBB16s16sBxHI'
+ self.buf = struct.pack(
+ self.form, self.v_tc_flow,
+ self.payload_length, self.fragment.TYPE, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst),
+ self.nxt, self.fragment_off_m, self.fragment_id)
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.traffic_class, self.ip.traffic_class)
+ eq_(self.flow_label, self.ip.flow_label)
+ eq_(self.payload_length, self.ip.payload_length)
+ eq_(self.fragment.TYPE, self.ip.nxt)
+ eq_(self.hop_limit, self.ip.hop_limit)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+
+ def test_parser(self):
+ _res = self.ip.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+
+ eq_(self.version, res.version)
+ eq_(self.traffic_class, res.traffic_class)
+ eq_(self.flow_label, res.flow_label)
+ eq_(self.payload_length, res.payload_length)
+ eq_(self.fragment.TYPE, res.nxt)
+ eq_(self.hop_limit, res.hop_limit)
+ eq_(self.src, res.src)
+ eq_(self.dst, res.dst)
+ eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+
+ res = struct.unpack_from(self.form, str(buf))
+
+ eq_(self.v_tc_flow, res[0])
+ eq_(self.payload_length, res[1])
+ eq_(self.fragment.TYPE, res[2])
+ eq_(self.hop_limit, res[3])
+ eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
+ eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+ eq_(self.nxt, res[6])
+ eq_(self.fragment_off_m, res[7])
+ eq_(self.fragment_id, res[8])
+
+ def test_to_string(self):
+ frag_values = {'nxt': self.nxt,
+ 'offset': self.fragment_offset,
+ 'more': self.fragment_more,
+ 'id_': self.fragment_id}
+ _frag_str = ','.join(['%s=%s' % (k, repr(frag_values[k]))
+ for k, v in inspect.getmembers(self.fragment)
+ if k in frag_values])
+ frag_str = '%s(%s)' % (ipv6.fragment.__name__, _frag_str)
+ head_str = '[%s]' % frag_str
+ ipv6_values = {'version': self.version,
+ 'traffic_class': self.traffic_class,
+ 'flow_label': self.flow_label,
+ 'payload_length': self.payload_length,
+ 'nxt': self.fragment.TYPE,
+ 'hop_limit': self.hop_limit,
+ 'src': repr(self.src),
+ 'dst': repr(self.dst),
+ 'ext_hdrs': head_str}
+ _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
+ for k, v in inspect.getmembers(self.ip)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ eq_(str(self.ip), ipv6_str)
+ eq_(repr(self.ip), ipv6_str)
+
+
+class Test_ipv6_with_auth(unittest.TestCase):
+
+ def setUp(self):
+ self.auth_size = 4
+ self.auth_spi = 256
+ self.auth_seq = 1
+ self.auth_data = '\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
+ self.auth = ipv6.auth(
+ self.auth_size, self.auth_spi, self.auth_seq, self.auth_data)
+
+ self.version = 6
+ self.traffic_class = 0
+ self.flow_label = 0
+ self.payload_length = 817 + len(self.auth)
+ self.nxt = 6
+ self.hop_limit = 128
+ self.src = '2002:4637:d5d3::4637:d5d3'
+ self.dst = '2001:4860:0:2001::68'
+ self.ext_hdrs = [self.auth]
+
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+
+ self.v_tc_flow = (
+ self.version << 28 | self.traffic_class << 20 |
+ self.flow_label << 12)
+ self.form = '!IHBB16s16sBB2xII12s'
+ self.buf = struct.pack(
+ self.form, self.v_tc_flow,
+ self.payload_length, self.auth.TYPE, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst),
+ self.nxt, self.auth_size, self.auth_spi, self.auth_seq,
+ self.auth_data)
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.traffic_class, self.ip.traffic_class)
+ eq_(self.flow_label, self.ip.flow_label)
+ eq_(self.payload_length, self.ip.payload_length)
+ eq_(self.auth.TYPE, self.ip.nxt)
+ eq_(self.hop_limit, self.ip.hop_limit)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+
+ def test_parser(self):
+ _res = self.ip.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+
+ eq_(self.version, res.version)
+ eq_(self.traffic_class, res.traffic_class)
+ eq_(self.flow_label, res.flow_label)
+ eq_(self.payload_length, res.payload_length)
+ eq_(self.auth.TYPE, res.nxt)
+ eq_(self.hop_limit, res.hop_limit)
+ eq_(self.src, res.src)
+ eq_(self.dst, res.dst)
+ eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+
+ res = struct.unpack_from(self.form, str(buf))
+
+ eq_(self.v_tc_flow, res[0])
+ eq_(self.payload_length, res[1])
+ eq_(self.auth.TYPE, res[2])
+ eq_(self.hop_limit, res[3])
+ eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
+ eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+ eq_(self.nxt, res[6])
+ eq_(self.auth_size, res[7])
+ eq_(self.auth_spi, res[8])
+ eq_(self.auth_seq, res[9])
+ eq_(self.auth_data, res[10])
+
+ def test_to_string(self):
+ auth_values = {'nxt': self.nxt,
+ 'size': self.auth_size,
+ 'spi': self.auth_spi,
+ 'seq': self.auth_seq,
+ 'data': self.auth_data}
+ _auth_str = ','.join(['%s=%s' % (k, repr(auth_values[k]))
+ for k, v in inspect.getmembers(self.auth)
+ if k in auth_values])
+ auth_str = '%s(%s)' % (ipv6.auth.__name__, _auth_str)
+ head_str = '[%s]' % auth_str
+ ipv6_values = {'version': self.version,
+ 'traffic_class': self.traffic_class,
+ 'flow_label': self.flow_label,
+ 'payload_length': self.payload_length,
+ 'nxt': self.auth.TYPE,
+ 'hop_limit': self.hop_limit,
+ 'src': repr(self.src),
+ 'dst': repr(self.dst),
+ 'ext_hdrs': head_str}
+ _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
+ for k, v in inspect.getmembers(self.ip)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ eq_(str(self.ip), ipv6_str)
+ eq_(repr(self.ip), ipv6_str)
+
+
+class Test_ipv6_with_multi_headers(unittest.TestCase):
+
+ def setUp(self):
+ self.opt1_type = 5
+ self.opt1_len = 2
+ self.opt1_data = '\x00\x00'
+ self.opt2_type = 1
+ self.opt2_len = 0
+ self.opt2_data = None
+ self.options = [
+ ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
+ ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
+ ]
+ self.hop_opts_size = 0
+ self.hop_opts = ipv6.hop_opts(self.hop_opts_size, self.options)
+
+ self.auth_size = 4
+ self.auth_spi = 256
+ self.auth_seq = 1
+ self.auth_data = '\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
+ self.auth = ipv6.auth(
+ self.auth_size, self.auth_spi, self.auth_seq, self.auth_data)
+
+ self.version = 6
+ self.traffic_class = 0
+ self.flow_label = 0
+ self.payload_length = 817 + len(self.hop_opts) + len(self.auth)
+ self.nxt = 6
+ self.hop_limit = 128
+ self.src = '2002:4637:d5d3::4637:d5d3'
+ self.dst = '2001:4860:0:2001::68'
+ self.ext_hdrs = [self.hop_opts, self.auth]
+
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+
+ self.v_tc_flow = (
+ self.version << 28 | self.traffic_class << 20 |
+ self.flow_label << 12)
+ self.form = '!IHBB16s16sBBBB2sBBBB2xII12s'
+ self.buf = struct.pack(
+ self.form, self.v_tc_flow,
+ self.payload_length, self.hop_opts.TYPE, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst),
+ self.auth.TYPE, self.hop_opts_size, self.opt1_type,
+ self.opt1_len, self.opt1_data, self.opt2_type,
+ self.opt2_len, self.nxt, self.auth_size, self.auth_spi,
+ self.auth_seq, self.auth_data)
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.traffic_class, self.ip.traffic_class)
+ eq_(self.flow_label, self.ip.flow_label)
+ eq_(self.payload_length, self.ip.payload_length)
+ eq_(self.hop_opts.TYPE, self.ip.nxt)
+ eq_(self.hop_limit, self.ip.hop_limit)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+
+ def test_parser(self):
+ _res = self.ip.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+
+ eq_(self.version, res.version)
+ eq_(self.traffic_class, res.traffic_class)
+ eq_(self.flow_label, res.flow_label)
+ eq_(self.payload_length, res.payload_length)
+ eq_(self.hop_opts.TYPE, res.nxt)
+ eq_(self.hop_limit, res.hop_limit)
+ eq_(self.src, res.src)
+ eq_(self.dst, res.dst)
+ eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+
+ res = struct.unpack_from(self.form, str(buf))
+
+ eq_(self.v_tc_flow, res[0])
+ eq_(self.payload_length, res[1])
+ eq_(self.hop_opts.TYPE, res[2])
+ eq_(self.hop_limit, res[3])
+ eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
+ eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+ eq_(self.auth.TYPE, res[6])
+ eq_(self.hop_opts_size, res[7])
+ eq_(self.opt1_type, res[8])
+ eq_(self.opt1_len, res[9])
+ eq_(self.opt1_data, res[10])
+ eq_(self.opt2_type, res[11])
+ eq_(self.opt2_len, res[12])
+ eq_(self.nxt, res[13])
+ eq_(self.auth_size, res[14])
+ eq_(self.auth_spi, res[15])
+ eq_(self.auth_seq, res[16])
+ eq_(self.auth_data, res[17])
+
+ def test_to_string(self):
+ option_values = ['type_', 'len_', 'data']
+ opt_str_list = []
+ for option in self.options:
+ _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k)))
+ for k, v in inspect.getmembers(option)
+ if k in option_values])
+ opt_str = '%s(%s)' % (ipv6.option.__name__, _opt_str)
+ opt_str_list.append(opt_str)
+ option_str = '[%s]' % ', '.join(opt_str_list)
+ hop_opts_values = {'nxt': self.auth.TYPE,
+ 'size': self.hop_opts_size,
+ 'data': option_str}
+ _hop_str = ','.join(['%s=%s' % (k, hop_opts_values[k])
+ for k, v in inspect.getmembers(self.hop_opts)
+ if k in hop_opts_values])
+ hop_str = '%s(%s)' % (ipv6.hop_opts.__name__, _hop_str)
+ auth_values = {'nxt': self.nxt,
+ 'size': self.auth_size,
+ 'spi': self.auth_spi,
+ 'seq': self.auth_seq,
+ 'data': self.auth_data}
+ _auth_str = ','.join(['%s=%s' % (k, repr(auth_values[k]))
+ for k, v in inspect.getmembers(self.auth)
+ if k in auth_values])
+ auth_str = '%s(%s)' % (ipv6.auth.__name__, _auth_str)
+ _head_str = ', '.join([hop_str, auth_str])
+ head_str = '[%s]' % _head_str
+ ipv6_values = {'version': self.version,
+ 'traffic_class': self.traffic_class,
+ 'flow_label': self.flow_label,
+ 'payload_length': self.payload_length,
+ 'nxt': self.hop_opts.TYPE,
'hop_limit': self.hop_limit,
- 'src': self.src,
- 'dst': self.dst}
- _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ 'src': repr(self.src),
+ 'dst': repr(self.dst),
+ 'ext_hdrs': head_str}
+ _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
for k, v in inspect.getmembers(self.ip)
if k in ipv6_values])
ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
eq_(str(self.ip), ipv6_str)
eq_(repr(self.ip), ipv6_str)
+
+
+class Test_hop_opts(unittest.TestCase):
+
+ def setUp(self):
+ self.nxt = 0
+ self.size = 8
+ self.data = [
+ ipv6.option(5, 2, '\x00\x00'),
+ ipv6.option(1, 0, None),
+ ipv6.option(0xc2, 4, '\x00\x01\x00\x00'),
+ ipv6.option(1, 0, None),
+ ]
+ self.hop = ipv6.hop_opts(self.size, self.data)
+ self.hop.set_nxt(self.nxt)
+ self.form = '!BB'
+ self.buf = struct.pack(self.form, self.nxt, self.size) \
+ + self.data[0].serialize() \
+ + self.data[1].serialize() \
+ + self.data[2].serialize() \
+ + self.data[3].serialize()
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.nxt, self.hop.nxt)
+ eq_(self.size, self.hop.size)
+ eq_(self.data, self.hop.data)
+
+ @raises(Exception)
+ def test_invalid_size(self):
+ ipv6.hop_opts(self.nxt, 1, self.data)
+
+ def test_parser(self):
+ _res = ipv6.hop_opts.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.nxt, res.nxt)
+ eq_(self.size, res.size)
+ eq_(str(self.data), str(res.data))
+
+ def test_serialize(self):
+ buf = self.hop.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.nxt, res[0])
+ eq_(self.size, res[1])
+ offset = struct.calcsize(self.form)
+ opt1 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt1)
+ opt2 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt2)
+ opt3 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt3)
+ opt4 = ipv6.option.parser(str(buf[offset:]))
+ eq_(5, opt1.type_)
+ eq_(2, opt1.len_)
+ eq_('\x00\x00', opt1.data)
+ eq_(1, opt2.type_)
+ eq_(0, opt2.len_)
+ eq_(None, opt2.data)
+ eq_(0xc2, opt3.type_)
+ eq_(4, opt3.len_)
+ eq_('\x00\x01\x00\x00', opt3.data)
+ eq_(1, opt4.type_)
+ eq_(0, opt4.len_)
+ eq_(None, opt4.data)
+
+ def test_len(self):
+ eq_(16, len(self.hop))
+
+
+class Test_dst_opts(unittest.TestCase):
+
+ def setUp(self):
+ self.nxt = 60
+ self.size = 8
+ self.data = [
+ ipv6.option(5, 2, '\x00\x00'),
+ ipv6.option(1, 0, None),
+ ipv6.option(0xc2, 4, '\x00\x01\x00\x00'),
+ ipv6.option(1, 0, None),
+ ]
+ self.dst = ipv6.dst_opts(self.size, self.data)
+ self.dst.set_nxt(self.nxt)
+ self.form = '!BB'
+ self.buf = struct.pack(self.form, self.nxt, self.size) \
+ + self.data[0].serialize() \
+ + self.data[1].serialize() \
+ + self.data[2].serialize() \
+ + self.data[3].serialize()
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.nxt, self.dst.nxt)
+ eq_(self.size, self.dst.size)
+ eq_(self.data, self.dst.data)
+
+ @raises(Exception)
+ def test_invalid_size(self):
+ ipv6.dst_opts(self.nxt, 1, self.data)
+
+ def test_parser(self):
+ _res = ipv6.dst_opts.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.nxt, res.nxt)
+ eq_(self.size, res.size)
+ eq_(str(self.data), str(res.data))
+
+ def test_serialize(self):
+ buf = self.dst.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.nxt, res[0])
+ eq_(self.size, res[1])
+ offset = struct.calcsize(self.form)
+ opt1 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt1)
+ opt2 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt2)
+ opt3 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt3)
+ opt4 = ipv6.option.parser(str(buf[offset:]))
+ eq_(5, opt1.type_)
+ eq_(2, opt1.len_)
+ eq_('\x00\x00', opt1.data)
+ eq_(1, opt2.type_)
+ eq_(0, opt2.len_)
+ eq_(None, opt2.data)
+ eq_(0xc2, opt3.type_)
+ eq_(4, opt3.len_)
+ eq_('\x00\x01\x00\x00', opt3.data)
+ eq_(1, opt4.type_)
+ eq_(0, opt4.len_)
+ eq_(None, opt4.data)
+
+ def test_len(self):
+ eq_(16, len(self.dst))
+
+
+class Test_option(unittest.TestCase):
+
+ def setUp(self):
+ self.type_ = 5
+ self.data = '\x00\x00'
+ self.len_ = len(self.data)
+ self.opt = ipv6.option(self.type_, self.len_, self.data)
+ self.form = '!BB%ds' % self.len_
+ self.buf = struct.pack(self.form, self.type_, self.len_, self.data)
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.type_, self.opt.type_)
+ eq_(self.len_, self.opt.len_)
+ eq_(self.data, self.opt.data)
+
+ def test_parser(self):
+ _res = ipv6.option.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.type_, res.type_)
+ eq_(self.len_, res.len_)
+ eq_(self.data, res.data)
+
+ def test_serialize(self):
+ buf = self.opt.serialize()
+ res = struct.unpack_from(self.form, buf)
+ eq_(self.type_, res[0])
+ eq_(self.len_, res[1])
+ eq_(self.data, res[2])
+
+ def test_len(self):
+ eq_(len(self.opt), 2 + self.len_)
+
+
+class Test_option_pad1(Test_option):
+
+ def setUp(self):
+ self.type_ = 0
+ self.len_ = -1
+ self.data = None
+ self.opt = ipv6.option(self.type_, self.len_, self.data)
+ self.form = '!B'
+ self.buf = struct.pack(self.form, self.type_)
+
+ def test_serialize(self):
+ buf = self.opt.serialize()
+ res = struct.unpack_from(self.form, buf)
+ eq_(self.type_, res[0])
+
+
+class Test_option_padN(Test_option):
+
+ def setUp(self):
+ self.type_ = 1
+ self.len_ = 0
+ self.data = None
+ self.opt = ipv6.option(self.type_, self.len_, self.data)
+ self.form = '!BB'
+ self.buf = struct.pack(self.form, self.type_, self.len_)
+
+ def test_serialize(self):
+ buf = self.opt.serialize()
+ res = struct.unpack_from(self.form, buf)
+ eq_(self.type_, res[0])
+ eq_(self.len_, res[1])
+
+
+class Test_fragment(unittest.TestCase):
+
+ def setUp(self):
+ self.nxt = 44
+ self.offset = 50
+ self.more = 1
+ self.id_ = 123
+ self.fragment = ipv6.fragment(self.offset, self.more, self.id_)
+ self.fragment.set_nxt(self.nxt)
+
+ self.off_m = (self.offset << 3 | self.more)
+ self.form = '!BxHI'
+ self.buf = struct.pack(self.form, self.nxt, self.off_m, self.id_)
+
+ def test_init(self):
+ eq_(self.nxt, self.fragment.nxt)
+ eq_(self.offset, self.fragment.offset)
+ eq_(self.more, self.fragment.more)
+ eq_(self.id_, self.fragment.id_)
+
+ def test_parser(self):
+ _res = ipv6.fragment.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.nxt, res.nxt)
+ eq_(self.offset, res.offset)
+ eq_(self.more, res.more)
+ eq_(self.id_, res.id_)
+
+ def test_serialize(self):
+ buf = self.fragment.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.nxt, res[0])
+ eq_(self.off_m, res[1])
+ eq_(self.id_, res[2])
+
+ def test_len(self):
+ eq_(8, len(self.fragment))
+
+
+class Test_auth(unittest.TestCase):
+
+ def setUp(self):
+ self.nxt = 0
+ self.size = 4
+ self.spi = 256
+ self.seq = 1
+ self.data = '\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8'
+ self.auth = ipv6.auth(self.size, self.spi, self.seq, self.data)
+ self.auth.set_nxt(self.nxt)
+ self.form = '!BB2xII12s'
+ self.buf = struct.pack(self.form, self.nxt, self.size, self.spi,
+ self.seq, self.data)
+
+ def test_init(self):
+ eq_(self.nxt, self.auth.nxt)
+ eq_(self.size, self.auth.size)
+ eq_(self.spi, self.auth.spi)
+ eq_(self.seq, self.auth.seq)
+ eq_(self.data, self.auth.data)
+
+ def test_parser(self):
+ _res = ipv6.auth.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.nxt, res.nxt)
+ eq_(self.size, res.size)
+ eq_(self.spi, res.spi)
+ eq_(self.seq, res.seq)
+ eq_(self.data, res.data)
+
+ def test_serialize(self):
+ buf = self.auth.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.nxt, res[0])
+ eq_(self.size, res[1])
+ eq_(self.spi, res[2])
+ eq_(self.seq, res[3])
+ eq_(self.data, res[4])
+
+ def test_len(self):
+ eq_((4 - 1) * 8, len(self.auth))
--
1.7.10.4
------------------------------------------------------------------------------
Learn the latest--Visual Studio 2012, SharePoint 2013, SQL 2012, more!
Discover the easy way to master current and previous Microsoft technologies
and advance your career. Get an incredible 1,500+ hours of step-by-step
tutorial videos with LearnDevNow. Subscribe today and save!
http://pubads.g.doubleclick.net/gampad/clk?id=58040911&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel