formerly this module tested only to_string(), so a bug in parser() of 
'TimeExceeded' was overlooked.
this patch will bring the module to test all the methods of all the classes in 
'icmp' module.


Signed-off-by: itoyuichi <[email protected]>
---
 ryu/tests/unit/packet/test_icmp.py |  300 +++++++++++++++++++++++++++++++++---
 1 file changed, 277 insertions(+), 23 deletions(-)

diff --git a/ryu/tests/unit/packet/test_icmp.py 
b/ryu/tests/unit/packet/test_icmp.py
index 75d9702..f93ed73 100644
--- a/ryu/tests/unit/packet/test_icmp.py
+++ b/ryu/tests/unit/packet/test_icmp.py
@@ -17,47 +17,186 @@
 import unittest
 import inspect
 import logging
+import struct

-from nose.tools import *
-from nose.plugins.skip import Skip, SkipTest
+from nose.tools import eq_
 from ryu.lib.packet import icmp
+from ryu.lib.packet import packet_utils


 LOG = logging.getLogger(__name__)


-class Test_icmp_dest_unreach(unittest.TestCase):
+class Test_icmp(unittest.TestCase):

-    type_ = icmp.ICMP_DEST_UNREACH
-    code = icmp.ICMP_HOST_UNREACH_CODE
-    csum = 0
+    echo_id = None
+    echo_seq = None
+    echo_data = None

-    mtu = 10
-    data = 'abc'
-    data_len = len(data)
-    dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data)
+    unreach_mtu = None
+    unreach_data = None
+    unreach_data_len = None

-    ic = icmp.icmp(type_, code, csum, data=dst_unreach)
+    te_data = None
+    te_data_len = None

     def setUp(self):
-        pass
+        self.type_ = icmp.ICMP_ECHO_REQUEST
+        self.code = 0
+        self.csum = 0
+        self.data = None

-    def tearDown(self):
-        pass
+        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

-    def test_to_string(self):
-        data_values = {'data': self.data,
-                       'data_len': self.data_len,
-                       'mtu': self.mtu}
-        _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
-                              for k, v in inspect.getmembers(self.dst_unreach)
-                              if k in data_values])
-        data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str)
+        self.buf = bytearray(struct.pack(
+            icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
+        self.csum_calc = packet_utils.checksum(str(self.buf))
+        struct.pack_into('!H', self.buf, 2, self.csum_calc)
+
+    def setUp_with_echo(self):
+        self.echo_id = 13379
+        self.echo_seq = 1
+        self.echo_data = '\x30\x0e\x09\x00\x00\x00\x00\x00' \
+            + '\x10\x11\x12\x13\x14\x15\x16\x17' \
+            + '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
+            + '\x20\x21\x22\x23\x24\x25\x26\x27' \
+            + '\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
+            + '\x30\x31\x32\x33\x34\x35\x36\x37'
+        self.data = icmp.echo(
+            id_=self.echo_id, seq=self.echo_seq, data=self.echo_data)
+
+        self.type_ = icmp.ICMP_ECHO_REQUEST
+        self.code = 0
+        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
+
+        self.buf = struct.pack(
+            icmp.icmp._PACK_STR, self.type_, self.code, self.csum)
+        self.buf += self.data.serialize()
+        self.csum_calc = packet_utils.checksum(str(self.buf))
+        struct.pack_into('!H', self.buf, 2, self.csum_calc)
+
+    def setUp_with_dest_unreach(self):
+        self.unreach_mtu = 10
+        self.unreach_data = 'abc'
+        self.unreach_data_len = len(self.unreach_data)
+        self.data = icmp.dest_unreach(
+            data_len=self.unreach_data_len, mtu=self.unreach_mtu,
+            data=self.unreach_data)
+
+        self.type_ = icmp.ICMP_DEST_UNREACH
+        self.code = icmp.ICMP_HOST_UNREACH_CODE
+        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
+
+        self.buf = struct.pack(
+            icmp.icmp._PACK_STR, self.type_, self.code, self.csum)
+        self.buf += self.data.serialize()
+        self.csum_calc = packet_utils.checksum(str(self.buf))
+        struct.pack_into('!H', self.buf, 2, self.csum_calc)
+
+    def setUp_with_TimeExceeded(self):
+        self.te_data = 'abc'
+        self.te_data_len = len(self.te_data)
+        self.data = icmp.TimeExceeded(
+            data_len=self.te_data_len, data=self.te_data)
+
+        self.type_ = icmp.ICMP_TIME_EXCEEDED
+        self.code = 0
+        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
+
+        self.buf = struct.pack(
+            icmp.icmp._PACK_STR, self.type_, self.code, self.csum)
+        self.buf += self.data.serialize()
+        self.csum_calc = packet_utils.checksum(str(self.buf))
+        struct.pack_into('!H', self.buf, 2, self.csum_calc)
+
+    def test_init(self):
+        eq_(self.type_, self.ic.type)
+        eq_(self.code, self.ic.code)
+        eq_(self.csum, self.ic.csum)
+        eq_(str(self.data), str(self.ic.data))
+
+    def test_init_with_echo(self):
+        self.setUp_with_echo()
+        self.test_init()
+
+    def test_init_with_dest_unreach(self):
+        self.setUp_with_dest_unreach()
+        self.test_init()
+
+    def test_init_with_TimeExceeded(self):
+        self.setUp_with_TimeExceeded()
+        self.test_init()
+
+    def test_parser(self):
+        _res = icmp.icmp.parser(str(self.buf))
+        if type(_res) is tuple:
+            res = _res[0]
+        else:
+            res = _res
+
+        eq_(self.type_, res.type)
+        eq_(self.code, res.code)
+        eq_(self.csum_calc, res.csum)
+        eq_(str(self.data), str(res.data))
+
+    def test_parser_with_echo(self):
+        self.setUp_with_echo()
+        self.test_parser()
+
+    def test_parser_with_dest_unreach(self):
+        self.setUp_with_dest_unreach()
+        self.test_parser()
+
+    def test_parser_with_TimeExceeded(self):
+        self.setUp_with_TimeExceeded()
+        self.test_parser()
+
+    def test_serialize(self):
+        data = bytearray()
+        prev = None
+        buf = self.ic.serialize(data, prev)
+
+        res = struct.unpack_from(icmp.icmp._PACK_STR, str(buf))
+
+        eq_(self.type_, res[0])
+        eq_(self.code, res[1])
+        eq_(self.csum_calc, res[2])
+
+    def test_serialize_with_echo(self):
+        self.setUp_with_echo()
+        self.test_serialize()
+
+        data = bytearray()
+        prev = None
+        buf = self.ic.serialize(data, prev)
+        echo = icmp.echo.parser(str(buf), icmp.icmp._MIN_LEN)
+        eq_(repr(self.data), repr(echo))
+
+    def test_serialize_with_dest_unreach(self):
+        self.setUp_with_dest_unreach()
+        self.test_serialize()
+
+        data = bytearray()
+        prev = None
+        buf = self.ic.serialize(data, prev)
+        unreach = icmp.dest_unreach.parser(str(buf), icmp.icmp._MIN_LEN)
+        eq_(repr(self.data), repr(unreach))
+
+    def test_serialize_with_TimeExceeded(self):
+        self.setUp_with_TimeExceeded()
+        self.test_serialize()

+        data = bytearray()
+        prev = None
+        buf = self.ic.serialize(data, prev)
+        te = icmp.TimeExceeded.parser(str(buf), icmp.icmp._MIN_LEN)
+        eq_(repr(self.data), repr(te))
+
+    def test_to_string(self):
         icmp_values = {'type': repr(self.type_),
                        'code': repr(self.code),
                        'csum': repr(self.csum),
-                       'data': data_str}
+                       'data': repr(self.data)}
         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
                             for k, v in inspect.getmembers(self.ic)
                             if k in icmp_values])
@@ -65,3 +204,118 @@ class Test_icmp_dest_unreach(unittest.TestCase):

         eq_(str(self.ic), ic_str)
         eq_(repr(self.ic), ic_str)
+
+    def test_to_string_with_echo(self):
+        self.setUp_with_echo()
+        self.test_to_string()
+
+    def test_to_string_with_dest_unreach(self):
+        self.setUp_with_dest_unreach()
+        self.test_to_string()
+
+    def test_to_string_with_TimeExceeded(self):
+        self.setUp_with_TimeExceeded()
+        self.test_to_string()
+
+
+class Test_echo(unittest.TestCase):
+
+    def setUp(self):
+        self.id_ = 13379
+        self.seq = 1
+        self.data = '\x30\x0e\x09\x00\x00\x00\x00\x00' \
+            + '\x10\x11\x12\x13\x14\x15\x16\x17' \
+            + '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
+            + '\x20\x21\x22\x23\x24\x25\x26\x27' \
+            + '\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
+            + '\x30\x31\x32\x33\x34\x35\x36\x37'
+        self.echo = icmp.echo(
+            self.id_, self.seq, self.data)
+        self.buf = struct.pack('!HH', self.id_, self.seq)
+        self.buf += self.data
+
+    def test_init(self):
+        eq_(self.id_, self.echo.id)
+        eq_(self.seq, self.echo.seq)
+        eq_(self.data, self.echo.data)
+
+    def test_parser(self):
+        _res = icmp.echo.parser(self.buf, 0)
+        if type(_res) is tuple:
+            res = _res[0]
+        else:
+            res = _res
+        eq_(self.id_, res.id)
+        eq_(self.seq, res.seq)
+        eq_(self.data, res.data)
+
+    def test_serialize(self):
+        buf = self.echo.serialize()
+        res = struct.unpack_from('!HH', str(buf))
+        eq_(self.id_, res[0])
+        eq_(self.seq, res[1])
+        eq_(self.data, buf[struct.calcsize('!HH'):])
+
+
+class Test_dest_unreach(unittest.TestCase):
+
+    def setUp(self):
+        self.mtu = 10
+        self.data = 'abc'
+        self.data_len = len(self.data)
+        self.dest_unreach = icmp.dest_unreach(
+            data_len=self.data_len, mtu=self.mtu, data=self.data)
+        self.buf = struct.pack('!xBH', self.data_len, self.mtu)
+        self.buf += self.data
+
+    def test_init(self):
+        eq_(self.data_len, self.dest_unreach.data_len)
+        eq_(self.mtu, self.dest_unreach.mtu)
+        eq_(self.data, self.dest_unreach.data)
+
+    def test_parser(self):
+        _res = icmp.dest_unreach.parser(self.buf, 0)
+        if type(_res) is tuple:
+            res = _res[0]
+        else:
+            res = _res
+        eq_(self.data_len, res.data_len)
+        eq_(self.mtu, res.mtu)
+        eq_(self.data, res.data)
+
+    def test_serialize(self):
+        buf = self.dest_unreach.serialize()
+        res = struct.unpack_from('!xBH', str(buf))
+        eq_(self.data_len, res[0])
+        eq_(self.mtu, res[1])
+        eq_(self.data, buf[struct.calcsize('!xBH'):])
+
+
+class Test_TimeExceeded(unittest.TestCase):
+
+    def setUp(self):
+        self.data = 'abc'
+        self.data_len = len(self.data)
+        self.te = icmp.TimeExceeded(
+            data_len=self.data_len, data=self.data)
+        self.buf = struct.pack('!xBxx', self.data_len)
+        self.buf += self.data
+
+    def test_init(self):
+        eq_(self.data_len, self.te.data_len)
+        eq_(self.data, self.te.data)
+
+    def test_parser(self):
+        _res = icmp.TimeExceeded.parser(self.buf, 0)
+        if type(_res) is tuple:
+            res = _res[0]
+        else:
+            res = _res
+        eq_(self.data_len, res.data_len)
+        eq_(self.data, res.data)
+
+    def test_serialize(self):
+        buf = self.te.serialize()
+        res = struct.unpack_from('!xBxx', str(buf))
+        eq_(self.data_len, res[0])
+        eq_(self.data, buf[struct.calcsize('!xBxx'):])
-- 
1.7.10.4


------------------------------------------------------------------------------
October Webinars: Code for Performance
Free Intel webinars can help you accelerate application performance.
Explore tips for MPI, OpenMP, advanced profiling, and more. Get the most from 
the latest Intel processors and coprocessors. See abstracts and register >
http://pubads.g.doubleclick.net/gampad/clk?id=60133471&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to