Signed-off-by: HIYAMA Manabu <[email protected]>
---
 ryu/tests/unit/packet/test_arp.py      |    5 +
 ryu/tests/unit/packet/test_ethernet.py |   90 ++++++++
 ryu/tests/unit/packet/test_ipv4.py     |  131 +++++++++++
 ryu/tests/unit/packet/test_packet.py   |  371 ++++++++++++++++++++++++++++++++
 ryu/tests/unit/packet/test_tcp.py      |  140 ++++++++++++
 ryu/tests/unit/packet/test_udp.py      |   96 +++++++++
 ryu/tests/unit/packet/test_vlan.py     |    5 +
 7 files changed, 838 insertions(+)
 create mode 100644 ryu/tests/unit/packet/test_ethernet.py
 create mode 100644 ryu/tests/unit/packet/test_ipv4.py
 create mode 100644 ryu/tests/unit/packet/test_packet.py
 create mode 100644 ryu/tests/unit/packet/test_tcp.py
 create mode 100644 ryu/tests/unit/packet/test_udp.py

diff --git a/ryu/tests/unit/packet/test_arp.py 
b/ryu/tests/unit/packet/test_arp.py
index e5be05b..d53af41 100644
--- a/ryu/tests/unit/packet/test_arp.py
+++ b/ryu/tests/unit/packet/test_arp.py
@@ -174,3 +174,8 @@ class Test_arp(unittest.TestCase):
         eq_(a.dst_mac, self.dst_mac)
         eq_(a.dst_ip, self.dst_ip)
         eq_(a.length, self.length)
+
+    @raises(Exception)
+    def test_malformed_arp(self):
+        m_short_buf = self.buf[1:arp._MIN_LEN]
+        arp.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_ethernet.py 
b/ryu/tests/unit/packet/test_ethernet.py
new file mode 100644
index 0000000..6378bb6
--- /dev/null
+++ b/ryu/tests/unit/packet/test_ethernet.py
@@ -0,0 +1,90 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.arp import arp
+
+
+LOG = logging.getLogger('test_ethernet')
+
+
+class Test_ethernet(unittest.TestCase):
+    """ Test case for ethernet
+    """
+
+    dst = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
+    src = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
+    ethertype = ether.ETH_TYPE_ARP
+    length = struct.calcsize(ethernet._PACK_STR)
+
+    buf = pack(ethernet._PACK_STR, dst, src, ethertype)
+
+    e = ethernet(dst, src, ethertype)
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def find_protocol(self, pkt, name):
+        for p in pkt.protocols:
+            if p.protocol_name == name:
+                return p
+
+    def test_init(self):
+        eq_(self.dst, self.e.dst)
+        eq_(self.src, self.e.src)
+        eq_(self.ethertype, self.e.ethertype)
+        eq_(self.length, self.e.length)
+
+    def test_parser(self):
+        res, ptype = self.e.parser(self.buf)
+        LOG.debug((res, ptype))
+
+        eq_(res.dst, self.dst)
+        eq_(res.src, self.src)
+        eq_(res.ethertype, self.ethertype)
+        eq_(res.length, self.length)
+        eq_(ptype, arp)
+
+    def test_serialize(self):
+        data = bytearray()
+        prev = None
+        buf = self.e.serialize(data, prev)
+
+        fmt = ethernet._PACK_STR
+        res = struct.unpack(fmt, buf)
+
+        eq_(res[0], self.dst)
+        eq_(res[1], self.src)
+        eq_(res[2], self.ethertype)
+
+    @raises(Exception)
+    def test_malformed_ethernet(self):
+        m_short_buf = self.buf[1:ethernet._MIN_LEN]
+        ethernet.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_ipv4.py 
b/ryu/tests/unit/packet/test_ipv4.py
new file mode 100644
index 0000000..d4b41ff
--- /dev/null
+++ b/ryu/tests/unit/packet/test_ipv4.py
@@ -0,0 +1,131 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet import packet_utils
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.ipv4 import ipv4
+from ryu.lib.packet.tcp import tcp
+import netaddr
+
+
+LOG = logging.getLogger('test_ipv4')
+
+
+class Test_ipv4(unittest.TestCase):
+    """ Test case for ipv4
+    """
+
+    version = 4
+    header_length = 5 + 10
+    ver_hlen = version << 4 | header_length
+    tos = 0
+    total_length = header_length + 64
+    identification = 30774
+    flags = 4
+    offset = 1480
+    flg_off = flags << 13 | offset
+    ttl = 64
+    proto = inet.IPPROTO_TCP
+    csum = 0xadc6
+    src = int(netaddr.IPAddress('131.151.32.21'))
+    dst = int(netaddr.IPAddress('131.151.32.129'))
+    length = header_length * 4
+    option = '\x86\x28\x00\x00\x00\x01\x01\x22' \
+        + '\x00\x01\xae\x00\x00\x00\x00\x00' \
+        + '\x00\x00\x00\x00\x00\x00\x00\x00' \
+        + '\x00\x00\x00\x00\x00\x00\x00\x00' \
+        + '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+    buf = pack(ipv4._PACK_STR, ver_hlen, tos, total_length, identification,
+               flg_off, ttl, proto, csum, src, dst) \
+        + option
+
+    ip = ipv4(version, header_length, tos, total_length, identification,
+              flags, offset, ttl, proto, csum, src, dst, option)
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def test_init(self):
+        eq_(self.version, self.ip.version)
+        eq_(self.header_length, self.ip.header_length)
+        eq_(self.tos, self.ip.tos)
+        eq_(self.total_length, self.ip.total_length)
+        eq_(self.identification, self.ip.identification)
+        eq_(self.flags, self.ip.flags)
+        eq_(self.offset, self.ip.offset)
+        eq_(self.ttl, self.ip.ttl)
+        eq_(self.proto, self.ip.proto)
+        eq_(self.csum, self.ip.csum)
+        eq_(self.src, self.ip.src)
+        eq_(self.dst, self.ip.dst)
+        eq_(self.length, self.ip.length)
+        eq_(self.option, self.ip.option)
+
+    def test_parser(self):
+        res, ptype = self.ip.parser(self.buf)
+
+        eq_(res.version, self.version)
+        eq_(res.header_length, self.header_length)
+        eq_(res.tos, self.tos)
+        eq_(res.total_length, self.total_length)
+        eq_(res.identification, self.identification)
+        eq_(res.flags, self.flags)
+        eq_(res.offset, self.offset)
+        eq_(res.ttl, self.ttl)
+        eq_(res.proto, self.proto)
+        eq_(res.csum, self.csum)
+        eq_(res.src, self.src)
+        eq_(res.dst, self.dst)
+        eq_(ptype, tcp)
+
+    def test_serialize(self):
+        buf = self.ip.serialize(bytearray(), None)
+        res = struct.unpack_from(ipv4._PACK_STR, str(buf))
+        option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)]
+
+        eq_(res[0], self.ver_hlen)
+        eq_(res[1], self.tos)
+        eq_(res[2], self.total_length)
+        eq_(res[3], self.identification)
+        eq_(res[4], self.flg_off)
+        eq_(res[5], self.ttl)
+        eq_(res[6], self.proto)
+        eq_(res[8], self.src)
+        eq_(res[9], self.dst)
+        eq_(option, self.option)
+
+        # checksum
+        csum = packet_utils.checksum(buf)
+        eq_(csum, 0)
+
+    @raises(Exception)
+    def test_malformed_ipv4(self):
+        m_short_buf = self.buf[1:ipv4._MIN_LEN]
+        ipv4.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_packet.py 
b/ryu/tests/unit/packet/test_packet.py
new file mode 100644
index 0000000..2e2601a
--- /dev/null
+++ b/ryu/tests/unit/packet/test_packet.py
@@ -0,0 +1,371 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+import array
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet import *
+
+
+LOG = logging.getLogger('test_packet')
+
+
+class TestPacket(unittest.TestCase):
+    """ Test case for packet
+    """
+
+    dst_mac = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
+    src_mac = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
+    dst_ip = int(netaddr.IPAddress('192.168.128.10'))
+    dst_ip_bin = struct.pack('!I', dst_ip)
+    src_ip = int(netaddr.IPAddress('192.168.122.20'))
+    src_ip_bin = struct.pack('!I', src_ip)
+    payload = '\x06\x06\x47\x50\x00\x00\x00\x00' \
+        + '\xcd\xc5\x00\x00\x00\x00\x00\x00' \
+        + '\x10\x11\x12\x13\x14\x15\x16\x17' \
+        + '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
+
+    def get_protocols(self, pkt):
+        protocols = {}
+        for p in pkt:
+            if hasattr(p, 'protocol_name'):
+                protocols[p.protocol_name] = p
+            else:
+                protocols['payload'] = p
+        return protocols
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def test_arp(self):
+        # buid packet
+        e = ethernet.ethernet(self.dst_mac, self.src_mac,
+                              ether.ETH_TYPE_ARP)
+        a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
+                    self.src_mac, self.src_ip, self.dst_mac,
+                    self.dst_ip)
+        p = packet.Packet()
+        p.add_protocol(e)
+        p.add_protocol(a)
+        p.serialize()
+
+        # ethernet !6s6sH
+        e_buf = self.dst_mac \
+            + self.src_mac \
+            + '\x08\x06'
+
+        # arp !HHBBH6sI6sI
+        a_buf = '\x00\x01' \
+            + '\x08\x00' \
+            + '\x06' \
+            + '\x04' \
+            + '\x00\x02' \
+            + self.src_mac \
+            + self.src_ip_bin \
+            + self.dst_mac \
+            + self.dst_ip_bin
+
+        buf = e_buf + a_buf
+        eq_(buf, p.data)
+
+        # parse
+        pkt = packet.Packet(array.array('B', p.data))
+        protocols = self.get_protocols(pkt)
+        p_eth = protocols['ethernet']
+        p_arp = protocols['arp']
+
+        # ethernet
+        ok_(p_eth)
+        eq_(self.dst_mac, p_eth.dst)
+        eq_(self.src_mac, p_eth.src)
+        eq_(ether.ETH_TYPE_ARP, p_eth.ethertype)
+
+        # arp
+        ok_(p_arp)
+        eq_(1, p_arp.hwtype)
+        eq_(ether.ETH_TYPE_IP, p_arp.proto)
+        eq_(6, p_arp.hlen)
+        eq_(4, p_arp.plen)
+        eq_(2, p_arp.opcode)
+        eq_(self.src_mac, p_arp.src_mac)
+        eq_(self.src_ip, p_arp.src_ip)
+        eq_(self.dst_mac, p_arp.dst_mac)
+        eq_(self.dst_ip, p_arp.dst_ip)
+
+    def test_vlan_arp(self):
+        # buid packet
+        e = ethernet.ethernet(self.dst_mac, self.src_mac,
+                              ether.ETH_TYPE_8021Q)
+        v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP)
+        a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
+                    self.src_mac, self.src_ip, self.dst_mac,
+                    self.dst_ip)
+        p = packet.Packet()
+        p.add_protocol(e)
+        p.add_protocol(v)
+        p.add_protocol(a)
+        p.serialize()
+
+        # ethernet !6s6sH
+        e_buf = self.dst_mac \
+            + self.src_mac \
+            + '\x81\x00'
+
+        # vlan !HH
+        v_buf = '\xF0\x03' \
+            + '\x08\x06'
+
+        # arp !HHBBH6sI6sI
+        a_buf = '\x00\x01' \
+            + '\x08\x00' \
+            + '\x06' \
+            + '\x04' \
+            + '\x00\x02' \
+            + self.src_mac \
+            + self.src_ip_bin \
+            + self.dst_mac \
+            + self.dst_ip_bin
+
+        buf = e_buf + v_buf + a_buf
+        eq_(buf, p.data)
+
+        # parse
+        pkt = packet.Packet(array.array('B', p.data))
+        protocols = self.get_protocols(pkt)
+        p_eth = protocols['ethernet']
+        p_vlan = protocols['vlan']
+        p_arp = protocols['arp']
+
+        # ethernet
+        ok_(p_eth)
+        eq_(self.dst_mac, p_eth.dst)
+        eq_(self.src_mac, p_eth.src)
+        eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype)
+
+        # vlan
+        ok_(p_vlan)
+        eq_(0b111, p_vlan.pcp)
+        eq_(0b1, p_vlan.cfi)
+        eq_(3, p_vlan.vid)
+        eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype)
+
+        # arp
+        ok_(p_arp)
+        eq_(1, p_arp.hwtype)
+        eq_(ether.ETH_TYPE_IP, p_arp.proto)
+        eq_(6, p_arp.hlen)
+        eq_(4, p_arp.plen)
+        eq_(2, p_arp.opcode)
+        eq_(self.src_mac, p_arp.src_mac)
+        eq_(self.src_ip, p_arp.src_ip)
+        eq_(self.dst_mac, p_arp.dst_mac)
+        eq_(self.dst_ip, p_arp.dst_ip)
+
+    def test_ipv4_udp(self):
+        # buid packet
+        e = ethernet.ethernet(self.dst_mac, self.src_mac,
+                              ether.ETH_TYPE_IP)
+        ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0,
+                       self.src_ip, self.dst_ip)
+        u = udp.udp(0x190F, 0x1F90, 0, 0)
+
+        p = packet.Packet()
+        p.add_protocol(e)
+        p.add_protocol(ip)
+        p.add_protocol(u)
+        p.add_protocol(self.payload)
+        p.serialize()
+
+        # ethernet !6s6sH
+        e_buf = self.dst_mac \
+            + self.src_mac \
+            + '\x08\x00'
+
+        # ipv4 !BBHHHBBHII
+        ip_buf = '\x45' \
+            + '\x01' \
+            + '\x00\x3C' \
+            + '\x00\x03' \
+            + '\x20\x04' \
+            + '\x40' \
+            + '\x11' \
+            + '\x00\x00' \
+            + self.src_ip_bin \
+            + self.dst_ip_bin
+
+        # udp !HHHH
+        u_buf = '\x19\x0F' \
+            + '\x1F\x90' \
+            + '\x00\x28' \
+            + '\x00\x00'
+
+        buf = e_buf + ip_buf + u_buf + self.payload
+
+        # parse
+        pkt = packet.Packet(array.array('B', p.data))
+        protocols = self.get_protocols(pkt)
+        p_eth = protocols['ethernet']
+        p_ipv4 = protocols['ipv4']
+        p_udp = protocols['udp']
+
+        # ethernet
+        ok_(p_eth)
+        eq_(self.dst_mac, p_eth.dst)
+        eq_(self.src_mac, p_eth.src)
+        eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+        # ipv4
+        ok_(p_ipv4)
+        eq_(4, p_ipv4.version)
+        eq_(5, p_ipv4.header_length)
+        eq_(1, p_ipv4.tos)
+        l = len(ip_buf) + len(u_buf) + len(self.payload)
+        eq_(l, p_ipv4.total_length)
+        eq_(3, p_ipv4.identification)
+        eq_(1, p_ipv4.flags)
+        eq_(64, p_ipv4.ttl)
+        eq_(inet.IPPROTO_UDP, p_ipv4.proto)
+        eq_(self.src_ip, p_ipv4.src)
+        eq_(self.dst_ip, p_ipv4.dst)
+        t = bytearray(ip_buf)
+        struct.pack_into('!H', t, 10, p_ipv4.csum)
+        eq_(packet_utils.checksum(t), 0)
+
+        # udp
+        ok_(p_udp)
+        eq_(0x190f, p_udp.src_port)
+        eq_(0x1F90, p_udp.dst_port)
+        eq_(len(u_buf) + len(self.payload), p_udp.total_length)
+        eq_(0x77b2, p_udp.csum)
+        t = bytearray(u_buf)
+        struct.pack_into('!H', t, 6, p_udp.csum)
+        ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0,
+                         17, len(u_buf) + len(self.payload))
+        t = ph + t + self.payload
+        eq_(packet_utils.checksum(t), 0)
+
+        # payload
+        ok_('payload' in protocols)
+        eq_(self.payload, protocols['payload'].tostring())
+
+    def test_ipv4_tcp(self):
+        # buid packet
+        e = ethernet.ethernet(self.dst_mac, self.src_mac,
+                              ether.ETH_TYPE_IP)
+        ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0,
+                       self.src_ip, self.dst_ip)
+        t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f,
+                    '\x01\x02')
+
+        p = packet.Packet()
+        p.add_protocol(e)
+        p.add_protocol(ip)
+        p.add_protocol(t)
+        p.add_protocol(self.payload)
+        p.serialize()
+
+        # ethernet !6s6sH
+        e_buf = self.dst_mac \
+            + self.src_mac \
+            + '\x08\x00'
+
+        # ipv4 !BBHHHBBHII
+        ip_buf = '\x45' \
+            + '\x00' \
+            + '\x00\x4C' \
+            + '\x00\x00' \
+            + '\x00\x00' \
+            + '\x40' \
+            + '\x06' \
+            + '\x00\x00' \
+            + self.src_ip_bin \
+            + self.dst_ip_bin
+
+        # tcp !HHIIBBHHH + option
+        t_buf = '\x19\x0F' \
+            + '\x1F\x90' \
+            + '\x00\x00\x01\x23' \
+            + '\x00\x00\x00\x01' \
+            + '\x60' \
+            + '\x2A' \
+            + '\x08\x00' \
+            + '\x00\x00' \
+            + '\x00\x6F' \
+            + '\x01\x02\x00\x00'
+
+        buf = e_buf + ip_buf + t_buf + self.payload
+
+        # parse
+        pkt = packet.Packet(array.array('B', p.data))
+        protocols = self.get_protocols(pkt)
+        p_eth = protocols['ethernet']
+        p_ipv4 = protocols['ipv4']
+        p_tcp = protocols['tcp']
+
+        # ethernet
+        ok_(p_eth)
+        eq_(self.dst_mac, p_eth.dst)
+        eq_(self.src_mac, p_eth.src)
+        eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+        # ipv4
+        ok_(p_ipv4)
+        eq_(4, p_ipv4.version)
+        eq_(5, p_ipv4.header_length)
+        eq_(0, p_ipv4.tos)
+        l = len(ip_buf) + len(t_buf) + len(self.payload)
+        eq_(l, p_ipv4.total_length)
+        eq_(0, p_ipv4.identification)
+        eq_(0, p_ipv4.flags)
+        eq_(64, p_ipv4.ttl)
+        eq_(inet.IPPROTO_TCP, p_ipv4.proto)
+        eq_(self.src_ip, p_ipv4.src)
+        eq_(self.dst_ip, p_ipv4.dst)
+        t = bytearray(ip_buf)
+        struct.pack_into('!H', t, 10, p_ipv4.csum)
+        eq_(packet_utils.checksum(t), 0)
+
+        # tcp
+        ok_(p_tcp)
+        eq_(0x190f, p_tcp.src_port)
+        eq_(0x1F90, p_tcp.dst_port)
+        eq_(0x123, p_tcp.seq)
+        eq_(1, p_tcp.ack)
+        eq_(6, p_tcp.offset)
+        eq_(0b101010, p_tcp.bits)
+        eq_(2048, p_tcp.window_size)
+        eq_(0x6f, p_tcp.urgent)
+        eq_(len(t_buf), p_tcp.length)
+        t = bytearray(t_buf)
+        struct.pack_into('!H', t, 16, p_tcp.csum)
+        ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0,
+                         6, len(t_buf) + len(self.payload))
+        t = ph + t + self.payload
+        eq_(packet_utils.checksum(t), 0)
+
+        # payload
+        ok_('payload' in protocols)
+        eq_(self.payload, protocols['payload'].tostring())
diff --git a/ryu/tests/unit/packet/test_tcp.py 
b/ryu/tests/unit/packet/test_tcp.py
new file mode 100644
index 0000000..978cb34
--- /dev/null
+++ b/ryu/tests/unit/packet/test_tcp.py
@@ -0,0 +1,140 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.tcp import tcp
+from ryu.lib.packet.ipv4 import ipv4
+from ryu.lib.packet import packet_utils
+
+
+LOG = logging.getLogger('test_tcp')
+
+
+class Test_tcp(unittest.TestCase):
+    """ Test case for tcp
+    """
+    src_port = 6431
+    dst_port = 8080
+    seq = 5
+    ack = 1
+    offset = 6
+    bits = 0b101010
+    window_size = 2048
+    csum = 12345
+    urgent = 128
+    option = '\x01\x02\x03\x04'
+
+    t = tcp(src_port, dst_port, seq, ack, offset, bits,
+            window_size, csum, urgent, option)
+
+    buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack,
+               offset << 4, bits, window_size, csum, urgent)
+    buf += option
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def test_init(self):
+        eq_(self.src_port, self.t.src_port)
+        eq_(self.dst_port, self.t.dst_port)
+        eq_(self.seq, self.t.seq)
+        eq_(self.ack, self.t.ack)
+        eq_(self.offset, self.t.offset)
+        eq_(self.bits, self.t.bits)
+        eq_(self.window_size, self.t.window_size)
+        eq_(self.csum, self.t.csum)
+        eq_(self.urgent, self.t.urgent)
+        eq_(self.option, self.t.option)
+
+    def test_parser(self):
+        r1, r2 = self.t.parser(self.buf)
+
+        eq_(self.src_port, r1.src_port)
+        eq_(self.dst_port, r1.dst_port)
+        eq_(self.seq, r1.seq)
+        eq_(self.ack, r1.ack)
+        eq_(self.offset, r1.offset)
+        eq_(self.bits, r1.bits)
+        eq_(self.window_size, r1.window_size)
+        eq_(self.csum, r1.csum)
+        eq_(self.urgent, r1.urgent)
+        eq_(self.option, r1.option)
+        eq_(None, r2)
+
+    def test_serialize(self):
+        offset = 5
+        csum = 0
+
+        src_ip = int(netaddr.IPAddress('192.168.10.1'))
+        dst_ip = int(netaddr.IPAddress('192.168.100.1'))
+        prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
+                    inet.IPPROTO_UDP, 0, src_ip, dst_ip)
+
+        t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
+                offset, self.bits, self.window_size, csum, self.urgent)
+        buf = t.serialize(bytearray(), prev)
+        res = struct.unpack(tcp._PACK_STR, str(buf))
+
+        eq_(res[0], self.src_port)
+        eq_(res[1], self.dst_port)
+        eq_(res[2], self.seq)
+        eq_(res[3], self.ack)
+        eq_(res[4], offset << 4)
+        eq_(res[5], self.bits)
+        eq_(res[6], self.window_size)
+        eq_(res[8], self.urgent)
+
+        # checksum
+        ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 6, offset * 4)
+        d = ph + buf + bytearray()
+        s = packet_utils.checksum(d)
+        eq_(0, s)
+
+    def test_serialize_option(self):
+        offset = 6
+        csum = 0
+        option = '\x01\x02'
+
+        src_ip = int(netaddr.IPAddress('192.168.10.1'))
+        dst_ip = int(netaddr.IPAddress('192.168.100.1'))
+        prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
+                    inet.IPPROTO_UDP, 0, src_ip, dst_ip)
+
+        t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
+                offset, self.bits, self.window_size, csum, self.urgent,
+                option)
+        buf = t.serialize(bytearray(), prev)
+        r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)]
+        eq_(option, r_option)
+
+    @raises(Exception)
+    def test_malformed_tcp(self):
+        m_short_buf = self.buf[1:tcp._MIN_LEN]
+        tcp.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_udp.py 
b/ryu/tests/unit/packet/test_udp.py
new file mode 100644
index 0000000..e35d56c
--- /dev/null
+++ b/ryu/tests/unit/packet/test_udp.py
@@ -0,0 +1,96 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.udp import udp
+from ryu.lib.packet.ipv4 import ipv4
+from ryu.lib.packet import packet_utils
+
+
+LOG = logging.getLogger('test_udp')
+
+
+class Test_udp(unittest.TestCase):
+    """ Test case for udp
+    """
+    src_port = 6431
+    dst_port = 8080
+    total_length = 65507
+    csum = 12345
+    u = udp(src_port, dst_port, total_length, csum)
+    buf = pack(udp._PACK_STR, src_port, dst_port, total_length, csum)
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def test_init(self):
+        eq_(self.src_port, self.u.src_port)
+        eq_(self.dst_port, self.u.dst_port)
+        eq_(self.total_length, self.u.total_length)
+        eq_(self.csum, self.u.csum)
+
+    def test_parser(self):
+        r1, r2 = self.u.parser(self.buf)
+
+        eq_(self.src_port, r1.src_port)
+        eq_(self.dst_port, r1.dst_port)
+        eq_(self.total_length, r1.total_length)
+        eq_(self.csum, r1.csum)
+        eq_(None, r2)
+
+    def test_serialize(self):
+        src_port = 6431
+        dst_port = 8080
+        total_length = 0
+        csum = 0
+
+        src_ip = int(netaddr.IPAddress('192.168.10.1'))
+        dst_ip = int(netaddr.IPAddress('192.168.100.1'))
+        prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
+                    inet.IPPROTO_UDP, 0, src_ip, dst_ip)
+
+        u = udp(src_port, dst_port, total_length, csum)
+        buf = u.serialize(bytearray(), prev)
+        res = struct.unpack(udp._PACK_STR, buf)
+
+        eq_(res[0], src_port)
+        eq_(res[1], dst_port)
+        eq_(res[2], struct.calcsize(udp._PACK_STR))
+
+        # checksum
+        ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 17, res[2])
+        d = ph + buf + bytearray()
+        s = packet_utils.checksum(d)
+        eq_(0, s)
+
+    @raises(Exception)
+    def test_malformed_udp(self):
+        m_short_buf = self.buf[1:udp._MIN_LEN]
+        udp.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_vlan.py 
b/ryu/tests/unit/packet/test_vlan.py
index cf5ae42..aaf7422 100644
--- a/ryu/tests/unit/packet/test_vlan.py
+++ b/ryu/tests/unit/packet/test_vlan.py
@@ -137,3 +137,8 @@ class Test_vlan(unittest.TestCase):
         eq_(v.vid, self.vid)
         eq_(v.ethertype, self.ethertype)
         eq_(v.length, self.length)
+
+    @raises(Exception)
+    def test_malformed_vlan(self):
+        m_short_buf = self.buf[1:vlan._MIN_LEN]
+        vlan.parser(m_short_buf)
-- 
1.7.9.5



------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_sfd2d_oct
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to