Signed-off-by: Minoru TAKAHASHI <[email protected]>
Signed-off-by: IWASE Yusuke <[email protected]>
---
 ryu/tests/unit/packet/test_tcp.py | 143 +++++++++++++++++++++++++++++---------
 1 file changed, 110 insertions(+), 33 deletions(-)

diff --git a/ryu/tests/unit/packet/test_tcp.py 
b/ryu/tests/unit/packet/test_tcp.py
index 862ad6f..7224df0 100644
--- a/ryu/tests/unit/packet/test_tcp.py
+++ b/ryu/tests/unit/packet/test_tcp.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2012-2015 Nippon Telegraph and Telephone Corporation.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -21,9 +21,8 @@ import six
 import struct
 from struct import *
 from nose.tools import *
-from ryu.ofproto import ether, inet
-from ryu.lib.packet.packet import Packet
-from ryu.lib.packet.tcp import tcp
+from ryu.ofproto import inet
+from ryu.lib.packet import tcp
 from ryu.lib.packet.ipv4 import ipv4
 from ryu.lib.packet import packet_utils
 from ryu.lib import addrconv
@@ -46,10 +45,10 @@ class Test_tcp(unittest.TestCase):
     urgent = 128
     option = b'\x01\x02\x03\x04'
 
-    t = tcp(src_port, dst_port, seq, ack, offset, bits,
-            window_size, csum, urgent, option)
+    t = tcp.tcp(src_port, dst_port, seq, ack, offset, bits,
+                window_size, csum, urgent, option)
 
-    buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack,
+    buf = pack(tcp.tcp._PACK_STR, src_port, dst_port, seq, ack,
                offset << 4, bits, window_size, csum, urgent)
     buf += option
 
@@ -95,10 +94,10 @@ class Test_tcp(unittest.TestCase):
         prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
                     inet.IPPROTO_TCP, 0, src_ip, dst_ip)
 
-        t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
-                offset, self.bits, self.window_size, csum, self.urgent)
+        t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
+                    offset, self.bits, self.window_size, csum, self.urgent)
         buf = t.serialize(bytearray(), prev)
-        res = struct.unpack(tcp._PACK_STR, six.binary_type(buf))
+        res = struct.unpack(tcp.tcp._PACK_STR, six.binary_type(buf))
 
         eq_(res[0], self.src_port)
         eq_(res[1], self.dst_port)
@@ -109,41 +108,63 @@ class Test_tcp(unittest.TestCase):
         eq_(res[6], self.window_size)
         eq_(res[8], self.urgent)
 
+        # test __len__
+        # offset indicates the number of 32 bit (= 4 bytes)
+        # words in the TCP Header.
+        # So, we compare len(tcp) with offset * 4, here.
+        eq_(offset * 4, len(t))
+
         # checksum
         ph = struct.pack('!4s4sBBH',
                          addrconv.ipv4.text_to_bin(src_ip),
                          addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
-        d = ph + buf + bytearray()
+        d = ph + buf
         s = packet_utils.checksum(d)
         eq_(0, s)
 
     def test_serialize_option(self):
-        offset = 6
+        # prepare test data
+        offset = 0
         csum = 0
-        option = b'\x01\x02'
-
-        src_ip = '192.168.10.1'
-        dst_ip = '192.168.100.1'
+        option = [
+            tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
+            tcp.TCPOptionSACKPermitted(),
+            tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
+            tcp.TCPOptionNoOperation(),
+            tcp.TCPOptionWindowScale(shift_cnt=9),
+        ]
+        option_buf = (
+            b'\x02\x04\x05\xb4'
+            b'\x04\x02'
+            b'\x08\x0a\x11\x22\x33\x44\x55\x66\x77\x88'
+            b'\x01'
+            b'\x03\x03\x09'
+        )
         prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
-                    inet.IPPROTO_TCP, 0, src_ip, dst_ip)
+                    inet.IPPROTO_TCP, 0, '192.168.10.1', '192.168.100.1')
 
-        t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
-                offset, self.bits, self.window_size, csum, self.urgent,
-                option)
+        # test serializer
+        t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
+                    offset, self.bits, self.window_size, csum, self.urgent,
+                    option)
         buf = t.serialize(bytearray(), prev)
-        r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)]
-        eq_(option, r_option)
+        r_option_buf = buf[tcp.tcp._MIN_LEN:tcp.tcp._MIN_LEN + len(option_buf)]
+        eq_(option_buf, r_option_buf)
+
+        # test parser
+        (r_tcp, _, _) = tcp.tcp.parser(buf)
+        eq_(str(option), str(r_tcp.option))
 
     @raises(Exception)
     def test_malformed_tcp(self):
-        m_short_buf = self.buf[1:tcp._MIN_LEN]
-        tcp.parser(m_short_buf)
+        m_short_buf = self.buf[1:tcp.tcp._MIN_LEN]
+        tcp.tcp.parser(m_short_buf)
 
     def test_default_args(self):
         prev = ipv4(proto=inet.IPPROTO_TCP)
-        t = tcp()
+        t = tcp.tcp()
         buf = t.serialize(bytearray(), prev)
-        res = struct.unpack(tcp._PACK_STR, buf)
+        res = struct.unpack(tcp.tcp._PACK_STR, buf)
 
         eq_(res[0], 1)
         eq_(res[1], 1)
@@ -155,9 +176,9 @@ class Test_tcp(unittest.TestCase):
         eq_(res[8], 0)
 
         # with option, without offset
-        t = tcp(option=b'\x01\x02\x03')
+        t = tcp.tcp(option=[tcp.TCPOptionMaximumSegmentSize(1460)])
         buf = t.serialize(bytearray(), prev)
-        res = struct.unpack(tcp._PACK_STR + '4s', buf)
+        res = struct.unpack(tcp.tcp._PACK_STR + '4s', buf)
 
         eq_(res[0], 1)
         eq_(res[1], 1)
@@ -167,12 +188,12 @@ class Test_tcp(unittest.TestCase):
         eq_(res[5], 0)
         eq_(res[6], 0)
         eq_(res[8], 0)
-        eq_(res[9], b'\x01\x02\x03\x00')
+        eq_(res[9], b'\x02\x04\x05\xb4')
 
         # with option, with long offset
-        t = tcp(offset=7, option=b'\x01\x02\x03')
+        t = tcp.tcp(offset=7, option=[tcp.TCPOptionWindowScale(shift_cnt=9)])
         buf = t.serialize(bytearray(), prev)
-        res = struct.unpack(tcp._PACK_STR + '8s', buf)
+        res = struct.unpack(tcp.tcp._PACK_STR + '8s', buf)
 
         eq_(res[0], 1)
         eq_(res[1], 1)
@@ -182,9 +203,65 @@ class Test_tcp(unittest.TestCase):
         eq_(res[5], 0)
         eq_(res[6], 0)
         eq_(res[8], 0)
-        eq_(res[9], b'\x01\x02\x03\x00\x00\x00\x00\x00')
+        eq_(res[9], b'\x03\x03\x09\x00\x00\x00\x00\x00')
 
     def test_json(self):
         jsondict = self.t.to_jsondict()
-        t = tcp.from_jsondict(jsondict['tcp'])
+        t = tcp.tcp.from_jsondict(jsondict['tcp'])
         eq_(str(self.t), str(t))
+
+
+class Test_TCPOption(unittest.TestCase):
+    # prepare test data
+    input_options = [
+        tcp.TCPOptionEndOfOptionList(),
+        tcp.TCPOptionNoOperation(),
+        tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
+        tcp.TCPOptionWindowScale(shift_cnt=9),
+        tcp.TCPOptionSACKPermitted(),
+        tcp.TCPOptionSACK(blocks=[(1, 2), (3, 4)], length=18),
+        tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
+        tcp.TCPOptionUserTimeout(granularity=1, user_timeout=564),
+        tcp.TCPOptionAuthentication(
+            key_id=1, r_next_key_id=2,
+            mac=b'abcdefghijkl', length=16),
+        tcp.TCPOptionUnknown(value=b'foobar', kind=255, length=8),
+        tcp.TCPOptionUnknown(value=b'', kind=255, length=2),
+    ]
+    input_buf = (
+        b'\x00'  # End of Option List
+        b'\x01'  # No-Operation
+        b'\x02\x04\x05\xb4'  # Maximum Segment Size
+        b'\x03\x03\x09'  # Window Scale
+        b'\x04\x02'  # SACK Permitted
+        b'\x05\x12'  # SACK
+        b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04'
+        b'\x08\x0a'  # Timestamps
+        b'\x11\x22\x33\x44\x55\x66\x77\x88'
+        b'\x1c\x04\x82\x34'  # User Timeout Option
+        b'\x1d\x10\x01\x02'  # TCP Authentication Option (TCP-AO)
+        b'abcdefghijkl'
+        b'\xff\x08'  # Unknown with body
+        b'foobar'
+        b'\xff\x02'  # Unknown
+    )
+
+    def test_serialize(self):
+        output_buf = bytearray()
+        for option in self.input_options:
+            output_buf += option.serialize()
+        eq_(self.input_buf, output_buf)
+
+    def test_parser(self):
+        buf = self.input_buf
+        output_options = []
+        while buf:
+            opt, buf = tcp.TCPOption.parser(buf)
+            output_options.append(opt)
+        eq_(str(self.input_options), str(output_options))
+
+    def test_json(self):
+        for option in self.input_options:
+            json_dict = option.to_jsondict()[option.__class__.__name__]
+            output_option = option.__class__.from_jsondict(json_dict)
+            eq_(str(option), str(output_option))
-- 
1.9.1


------------------------------------------------------------------------------
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to