Signed-off-by: itoyuichi <[email protected]>
---
ryu/lib/packet/ipv6.py | 62 ++++++++++++++++++
ryu/tests/unit/packet/test_ipv6.py | 126 ++++++++++++++++++++++++++++++++++++
2 files changed, 188 insertions(+)
diff --git a/ryu/lib/packet/ipv6.py b/ryu/lib/packet/ipv6.py
index 19a2d3f..4b4256a 100644
--- a/ryu/lib/packet/ipv6.py
+++ b/ryu/lib/packet/ipv6.py
@@ -168,6 +168,68 @@ class header(stringify.StringifyMixin):
pass
[email protected]_header_type(inet.IPPROTO_HOPOPTS)
+class hop_opts(header):
+ """IPv6 (RFC 2460) Hop-by-Hop Options header encoder/decoder class.
+
+ This is used with ryu.lib.packet.ipv6.ipv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the correspondig args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ size the length of the Hop-by-Hop Options header,
+ not include the first 8 octet.
+ data IPv6 options.
+ ============== =======================================
+ """
+ TYPE = inet.IPPROTO_HOPOPTS
+
+ _PACK_STR = '!BB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _FIX_SIZE = 8
+
+ def __init__(self, size, data):
+ super(hop_opts, self).__init__()
+ assert not (size % 8)
+ self.size = size
+ self.data = data
+
+ @classmethod
+ def parser(cls, buf):
+ (nxt, len_) = struct.unpack_from(cls._PACK_STR, buf)
+ data_len = cls._FIX_SIZE + int(len_)
+ data = []
+ size = cls._MIN_LEN
+ while size < data_len:
+ (type_, ) = struct.unpack_from('!B', buf[size:])
+ if type_ == 0:
+ opt = option(type_, -1, None)
+ size += 1
+ else:
+ opt = option.parser(buf[size:])
+ size += len(opt)
+ data.append(opt)
+ ret = cls(len_, data)
+ ret.set_nxt(nxt)
+ return ret
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR, self.nxt, self.size)
+ buf = bytearray(buf)
+ for opt in self.data:
+ buf.extend(opt.serialize())
+ return buf
+
+ def __len__(self):
+ return self._FIX_SIZE + self.size
+
+
class option(stringify.StringifyMixin):
"""IPv6 (RFC 2460) Options header encoder/decoder class.
diff --git a/ryu/tests/unit/packet/test_ipv6.py
b/ryu/tests/unit/packet/test_ipv6.py
index 22b6a52..493af03 100644
--- a/ryu/tests/unit/packet/test_ipv6.py
+++ b/ryu/tests/unit/packet/test_ipv6.py
@@ -55,6 +55,34 @@ class Test_ipv6(unittest.TestCase):
addrconv.ipv6.text_to_bin(self.src),
addrconv.ipv6.text_to_bin(self.dst))
+ def setUp_with_hop_opts(self):
+ self.opt1_type = 5
+ self.opt1_len = 2
+ self.opt1_data = '\x00\x00'
+ self.opt2_type = 1
+ self.opt2_len = 0
+ self.opt2_data = None
+ self.options = [
+ ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
+ ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
+ ]
+ self.hop_opts_size = 0
+ self.hop_opts = ipv6.hop_opts(self.hop_opts_size, self.options)
+ self.ext_hdrs = [self.hop_opts]
+ self.payload_length += len(self.hop_opts)
+ self.ip = ipv6.ipv6(
+ self.version, self.traffic_class, self.flow_label,
+ self.payload_length, self.nxt, self.hop_limit, self.src,
+ self.dst, self.ext_hdrs)
+ self.hop_opts.nxt = self.nxt
+ self.nxt = self.hop_opts.TYPE
+ self.buf = struct.pack(
+ ipv6.ipv6._PACK_STR, self.v_tc_flow,
+ self.payload_length, self.nxt, self.hop_limit,
+ addrconv.ipv6.text_to_bin(self.src),
+ addrconv.ipv6.text_to_bin(self.dst))
+ self.buf += self.hop_opts.serialize()
+
def tearDown(self):
pass
@@ -69,6 +97,10 @@ class Test_ipv6(unittest.TestCase):
eq_(self.dst, self.ip.dst)
eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
+ def test_init_with_hop_opts(self):
+ self.setUp_with_hop_opts()
+ self.test_init()
+
def test_parser(self):
_res = self.ip.parser(str(self.buf))
if type(_res) is tuple:
@@ -86,6 +118,10 @@ class Test_ipv6(unittest.TestCase):
eq_(self.dst, res.dst)
eq_(str(self.ext_hdrs), str(res.ext_hdrs))
+ def test_parser_with_hop_opts(self):
+ self.setUp_with_hop_opts()
+ self.test_parser()
+
def test_serialize(self):
data = bytearray()
prev = None
@@ -100,6 +136,16 @@ class Test_ipv6(unittest.TestCase):
eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
+ def test_serialize_with_hop_opts(self):
+ self.setUp_with_hop_opts()
+ self.test_serialize()
+
+ data = bytearray()
+ prev = None
+ buf = self.ip.serialize(data, prev)
+ hop_opts = ipv6.hop_opts.parser(str(buf[ipv6.ipv6._MIN_LEN:]))
+ eq_(repr(self.hop_opts), repr(hop_opts))
+
def test_to_string(self):
ipv6_values = {'version': self.version,
'traffic_class': self.traffic_class,
@@ -118,9 +164,89 @@ class Test_ipv6(unittest.TestCase):
eq_(str(self.ip), ipv6_str)
eq_(repr(self.ip), ipv6_str)
+ def test_to_string_with_hop_opts(self):
+ self.setUp_with_hop_opts()
+ self.test_to_string()
+
def test_len(self):
eq_(len(self.ip), 40)
+ def test_len_with_hop_opts(self):
+ self.setUp_with_hop_opts()
+ eq_(len(self.ip), 40 + len(self.hop_opts))
+
+
+class Test_hop_opts(unittest.TestCase):
+
+ def setUp(self):
+ self.nxt = 0
+ self.size = 8
+ self.data = [
+ ipv6.option(5, 2, '\x00\x00'),
+ ipv6.option(1, 0, None),
+ ipv6.option(0xc2, 4, '\x00\x01\x00\x00'),
+ ipv6.option(1, 0, None),
+ ]
+ self.hop = ipv6.hop_opts(self.size, self.data)
+ self.hop.set_nxt(self.nxt)
+ self.form = '!BB'
+ self.buf = struct.pack(self.form, self.nxt, self.size) \
+ + self.data[0].serialize() \
+ + self.data[1].serialize() \
+ + self.data[2].serialize() \
+ + self.data[3].serialize()
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.nxt, self.hop.nxt)
+ eq_(self.size, self.hop.size)
+ eq_(self.data, self.hop.data)
+
+ @raises(Exception)
+ def test_invalid_size(self):
+ ipv6.hop_opts(self.nxt, 1, self.data)
+
+ def test_parser(self):
+ _res = ipv6.hop_opts.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.nxt, res.nxt)
+ eq_(self.size, res.size)
+ eq_(str(self.data), str(res.data))
+
+ def test_serialize(self):
+ buf = self.hop.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.nxt, res[0])
+ eq_(self.size, res[1])
+ offset = struct.calcsize(self.form)
+ opt1 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt1)
+ opt2 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt2)
+ opt3 = ipv6.option.parser(str(buf[offset:]))
+ offset += len(opt3)
+ opt4 = ipv6.option.parser(str(buf[offset:]))
+ eq_(5, opt1.type_)
+ eq_(2, opt1.len_)
+ eq_('\x00\x00', opt1.data)
+ eq_(1, opt2.type_)
+ eq_(0, opt2.len_)
+ eq_(None, opt2.data)
+ eq_(0xc2, opt3.type_)
+ eq_(4, opt3.len_)
+ eq_('\x00\x01\x00\x00', opt3.data)
+ eq_(1, opt4.type_)
+ eq_(0, opt4.len_)
+ eq_(None, opt4.data)
+
+ def test_len(self):
+ eq_(16, len(self.hop))
+
class Test_option(unittest.TestCase):
--
1.7.10.4
------------------------------------------------------------------------------
How ServiceNow helps IT people transform IT departments:
1. Consolidate legacy IT systems to a single system of record for IT
2. Standardize and globalize service processes across IT
3. Implement zero-touch automation to replace manual, redundant tasks
http://pubads.g.doubleclick.net/gampad/clk?id=51271111&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel