Signed-off-by: Yuichi Ito <[email protected]>
---
 ryu/tests/unit/packet/test_packet.py |  249 ++++++++++++++++++++++++++++++++++
 1 file changed, 249 insertions(+)

diff --git a/ryu/tests/unit/packet/test_packet.py 
b/ryu/tests/unit/packet/test_packet.py
index a97c6e8..d5c0375 100644
--- a/ryu/tests/unit/packet/test_packet.py
+++ b/ryu/tests/unit/packet/test_packet.py
@@ -719,6 +719,141 @@ class TestPacket(unittest.TestCase):
         eq_(pkt_str, str(pkt))
         eq_(pkt_str, repr(pkt))

+    def test_ipv4_icmp(self):
+        # buid packet
+        e = ethernet.ethernet(self.dst_mac, self.src_mac,
+                              ether.ETH_TYPE_IP)
+        ip = ipv4.ipv4(proto=inet.IPPROTO_ICMP)
+        ic = icmp.icmp()
+
+        p = e/ip/ic
+        p.serialize()
+
+        ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
+
+        # ethernet !6s6sH
+        e_buf = self.dst_mac_bin \
+            + self.src_mac_bin \
+            + '\x08\x00'
+
+        # ipv4 !BBHHHBBHII
+        ip_buf = '\x45' \
+            + '\x00' \
+            + '\x00\x1c' \
+            + '\x00\x00' \
+            + '\x00\x00' \
+            + '\xff' \
+            + '\x01' \
+            + '\x00\x00' \
+            + ipaddr \
+            + ipaddr
+
+        # icmp !BBH + echo !HH
+        ic_buf = '\x08' \
+            + '\x00' \
+            + '\x00\x00' \
+            + '\x00\x00' \
+            + '\x00\x00'
+
+        buf = e_buf + ip_buf + ic_buf
+
+        # parse
+        pkt = packet.Packet(array.array('B', p.data))
+        protocols = self.get_protocols(pkt)
+        p_eth = protocols['ethernet']
+        p_ipv4 = protocols['ipv4']
+        p_icmp = protocols['icmp']
+
+        # ethernet
+        ok_(p_eth)
+        eq_(self.dst_mac, p_eth.dst)
+        eq_(self.src_mac, p_eth.src)
+        eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+        # ipv4
+        ok_(p_ipv4)
+        eq_(4, p_ipv4.version)
+        eq_(5, p_ipv4.header_length)
+        eq_(0, p_ipv4.tos)
+        l = len(ip_buf) + len(ic_buf)
+        eq_(l, p_ipv4.total_length)
+        eq_(0, p_ipv4.identification)
+        eq_(0, p_ipv4.flags)
+        eq_(255, p_ipv4.ttl)
+        eq_(inet.IPPROTO_ICMP, p_ipv4.proto)
+        eq_('0.0.0.0', p_ipv4.src)
+        eq_('0.0.0.0', p_ipv4.dst)
+        t = bytearray(ip_buf)
+        struct.pack_into('!H', t, 10, p_ipv4.csum)
+        eq_(packet_utils.checksum(t), 0)
+
+        # icmp
+        ok_(p_icmp)
+        eq_(8, p_icmp.type)
+        eq_(0, p_icmp.code)
+        eq_(0, p_icmp.data.id)
+        eq_(0, p_icmp.data.seq)
+        eq_(len(ic_buf), len(p_icmp))
+        t = bytearray(ic_buf)
+        struct.pack_into('!H', t, 2, p_icmp.csum)
+        eq_(packet_utils.checksum(t), 0)
+
+        # to string
+        eth_values = {'dst': self.dst_mac,
+                      'src': self.src_mac,
+                      'ethertype': ether.ETH_TYPE_IP}
+        _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+                             for k, _ in inspect.getmembers(p_eth)
+                             if k in eth_values])
+        eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+        ipv4_values = {'version': 4,
+                       'header_length': 5,
+                       'tos': 0,
+                       'total_length': l,
+                       'identification': 0,
+                       'flags': 0,
+                       'offset': p_ipv4.offset,
+                       'ttl': 255,
+                       'proto': inet.IPPROTO_ICMP,
+                       'csum': p_ipv4.csum,
+                       'src': '0.0.0.0',
+                       'dst': '0.0.0.0',
+                       'option': None}
+        _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+                              for k, _ in inspect.getmembers(p_ipv4)
+                              if k in ipv4_values])
+        ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+        echo_values = {'id': 0,
+                       'seq': 0,
+                       'data': None}
+        _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
+                              for k in sorted(echo_values.keys())])
+        echo_str = '%s(%s)' % (icmp.echo.__name__, _echo_str)
+        icmp_values = {'type': 8,
+                       'code': 0,
+                       'csum': p_icmp.csum,
+                       'data': echo_str}
+        _icmp_str = ','.join(['%s=%s' % (k, icmp_values[k])
+                              for k, _ in inspect.getmembers(p_icmp)
+                              if k in icmp_values])
+        icmp_str = '%s(%s)' % (icmp.icmp.__name__, _icmp_str)
+
+        pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, icmp_str)
+
+        eq_(eth_str, str(p_eth))
+        eq_(eth_str, repr(p_eth))
+
+        eq_(ipv4_str, str(p_ipv4))
+        eq_(ipv4_str, repr(p_ipv4))
+
+        eq_(icmp_str, str(p_icmp))
+        eq_(icmp_str, repr(p_icmp))
+
+        eq_(pkt_str, str(pkt))
+        eq_(pkt_str, repr(pkt))
+
     def test_ipv6_udp(self):
         # build packet
         e = ethernet.ethernet(self.dst_mac, self.src_mac,
@@ -1124,6 +1259,120 @@ class TestPacket(unittest.TestCase):
         eq_(pkt_str, str(pkt))
         eq_(pkt_str, repr(pkt))

+    def test_ipv6_icmpv6(self):
+        # build packet
+        e = ethernet.ethernet(self.dst_mac, self.src_mac,
+                              ether.ETH_TYPE_IPV6)
+        ip = ipv6.ipv6(nxt=inet.IPPROTO_ICMPV6)
+        ic = icmpv6.icmpv6()
+
+        p = e/ip/ic
+        p.serialize()
+
+        ipaddr = addrconv.ipv6.text_to_bin('::')
+
+        # ethernet !6s6sH
+        e_buf = self.dst_mac_bin \
+            + self.src_mac_bin \
+            + '\x86\xdd'
+
+        # ipv6 !IHBB16s16s'
+        ip_buf = '\x60\x00\x00\x00' \
+            + '\x00\x00' \
+            + '\x3a' \
+            + '\x00' \
+            + '\x00\x00' \
+            + ipaddr \
+            + ipaddr
+
+        # icmpv6 !BBH
+        ic_buf = '\x00' \
+            + '\x00' \
+            + '\x00\x00'
+
+        buf = e_buf + ip_buf + ic_buf
+
+        # parse
+        pkt = packet.Packet(array.array('B', p.data))
+        protocols = self.get_protocols(pkt)
+        p_eth = protocols['ethernet']
+        p_ipv6 = protocols['ipv6']
+        p_icmpv6 = protocols['icmpv6']
+
+        # ethernet
+        ok_(p_eth)
+        eq_(self.dst_mac, p_eth.dst)
+        eq_(self.src_mac, p_eth.src)
+        eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+        # ipv6
+        ok_(p_ipv6)
+        eq_(6, p_ipv6.version)
+        eq_(0, p_ipv6.traffic_class)
+        eq_(0, p_ipv6.flow_label)
+        eq_(len(ic_buf), p_ipv6.payload_length)
+        eq_(inet.IPPROTO_ICMPV6, p_ipv6.nxt)
+        eq_(0, p_ipv6.hop_limit)
+        eq_('::', p_ipv6.src)
+        eq_('::', p_ipv6.dst)
+
+        # icmpv6
+        ok_(p_icmpv6)
+        eq_(0, p_icmpv6.type_)
+        eq_(0, p_icmpv6.code)
+        eq_(len(ic_buf), len(p_icmpv6))
+        t = bytearray(ic_buf)
+        struct.pack_into('!H', t, 2, p_icmpv6.csum)
+        ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, len(ic_buf), 58)
+        t = ph + t
+        eq_(packet_utils.checksum(t), 0)
+
+        # to string
+        eth_values = {'dst': self.dst_mac,
+                      'src': self.src_mac,
+                      'ethertype': ether.ETH_TYPE_IPV6}
+        _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+                             for k, _ in inspect.getmembers(p_eth)
+                             if k in eth_values])
+        eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+        ipv6_values = {'version': 6,
+                       'traffic_class': 0,
+                       'flow_label': 0,
+                       'payload_length': len(ic_buf),
+                       'nxt': inet.IPPROTO_ICMPV6,
+                       'hop_limit': 0,
+                       'src': '::',
+                       'dst': '::',
+                       'ext_hdrs': []}
+        _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+                              for k, _ in inspect.getmembers(p_ipv6)
+                              if k in ipv6_values])
+        ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+        icmpv6_values = {'type_': 0,
+                         'code': 0,
+                         'csum': p_icmpv6.csum,
+                         'data': None}
+        _icmpv6_str = ','.join(['%s=%s' % (k, repr(icmpv6_values[k]))
+                                for k, _ in inspect.getmembers(p_icmpv6)
+                                if k in icmpv6_values])
+        icmpv6_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _icmpv6_str)
+
+        pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, icmpv6_str)
+
+        eq_(eth_str, str(p_eth))
+        eq_(eth_str, repr(p_eth))
+
+        eq_(ipv6_str, str(p_ipv6))
+        eq_(ipv6_str, repr(p_ipv6))
+
+        eq_(icmpv6_str, str(p_icmpv6))
+        eq_(icmpv6_str, repr(p_icmpv6))
+
+        eq_(pkt_str, str(pkt))
+        eq_(pkt_str, repr(pkt))
+
     def test_llc_bpdu(self):
         # buid packet
         e = ethernet.ethernet(self.dst_mac, self.src_mac,
-- 
1.7.10.4


------------------------------------------------------------------------------
November Webinars for C, C++, Fortran Developers
Accelerate application performance with scalable programming models. Explore
techniques for threading, error checking, porting, and tuning. Get the most 
from the latest Intel processors and coprocessors. See abstracts and register
http://pubads.g.doubleclick.net/gampad/clk?id=60136231&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to