Signed-off-by: WATANABE Fumitaka <[email protected]> --- ryu/tests/unit/packet/test_dhcp.py | 107 ++++++++++++++++++++ ryu/tests/unit/packet/test_icmp.py | 63 ++++++++++++ ryu/tests/unit/packet/test_icmpv6.py | 64 ++++++++++++ ryu/tests/unit/packet/test_ipv6.py | 64 ++++++++++++ ryu/tests/unit/packet/test_lldp.py | 174 +++++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_mpls.py | 52 ++++++++++ ryu/tests/unit/packet/test_packet.py | 179 ++++++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_vrrp.py | 66 +++++++++++++ 8 files changed, 769 insertions(+) create mode 100644 ryu/tests/unit/packet/test_dhcp.py create mode 100644 ryu/tests/unit/packet/test_icmp.py create mode 100644 ryu/tests/unit/packet/test_ipv6.py create mode 100644 ryu/tests/unit/packet/test_mpls.py
diff --git a/ryu/tests/unit/packet/test_dhcp.py b/ryu/tests/unit/packet/test_dhcp.py new file mode 100644 index 0000000..d5be755 --- /dev/null +++ b/ryu/tests/unit/packet/test_dhcp.py @@ -0,0 +1,107 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import socket +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib import mac +from ryu.lib.packet import dhcp + + +LOG = logging.getLogger(__name__) + + +class Test_dhcp_offer(unittest.TestCase): + + op = dhcp.DHCP_BOOT_REPLY + chaddr = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA') + htype = 1 + hlen = 6 + hops = 0 + xid = 1 + secs = 0 + flags = 1 + ciaddr = ip.ipv4_to_bin('192.168.10.10') + yiaddr = ip.ipv4_to_bin('192.168.20.20') + siaddr = ip.ipv4_to_bin('192.168.30.30') + giaddr = ip.ipv4_to_bin('192.168.40.40') + sname = 'abc' + boot_file = '' + + option_list = [dhcp.option("35", "02"), + dhcp.option("01", "ffffff00"), + dhcp.option("03", "c0a80a09"), + dhcp.option("06", "c0a80a09"), + dhcp.option("33", "0003f480"), + dhcp.option("3a", "0001fa40"), + dhcp.option("3b", "000375f0"), + dhcp.option("36", "c0a80a09")] + magic_cookie = socket.inet_aton("99.130.83.99") + options = dhcp.options(option_list=option_list, + magic_cookie=magic_cookie) + + dh = dhcp.dhcp(op, chaddr, options, htype=htype, hlen=hlen, + hops=hops, xid=xid, secs=secs, flags=flags, + ciaddr=ciaddr, yiaddr=yiaddr, siaddr=siaddr, + giaddr=giaddr, sname=sname, boot_file=boot_file) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + _option_str = '%s(length=%d,tag=\'%s\',value=\'%s\')' + opt_str = ', '.join([_option_str % (dhcp.option.__name__, + option.length, + option.tag, + option.value) + for option in self.option_list]) + option_str = '[%s]' % opt_str + + _options_str = '%s(magic_cookie=\'%s\',option_list=%s,options_len=%d)' + options_str = _options_str % (dhcp.options.__name__, + socket.inet_ntoa(self.magic_cookie), + option_str, + self.options.options_len) + + _dh_str = ('%s(boot_file=\'%s\',chaddr=\'%s\',ciaddr=\'%s\',' + 'flags=%d,giaddr=\'%s\',hlen=%d,hops=%d,htype=%d,' + 'op=%d,options=%s,secs=%d,siaddr=\'%s\',' + 'sname=\'%s\',xid=\'%s\',yiaddr=\'%s\')') + dh_str = _dh_str % (dhcp.dhcp.__name__, + self.boot_file, + mac.haddr_to_str(self.chaddr), + ip.ipv4_to_str(self.ciaddr), + self.flags, + ip.ipv4_to_str(self.giaddr), + self.hlen, + self.hops, + self.htype, + self.op, + options_str, + self.secs, + ip.ipv4_to_str(self.siaddr), + self.sname, + hex(self.xid), + ip.ipv4_to_str(self.yiaddr)) + + eq_(str(self.dh), dh_str) + eq_(repr(self.dh), dh_str) diff --git a/ryu/tests/unit/packet/test_icmp.py b/ryu/tests/unit/packet/test_icmp.py new file mode 100644 index 0000000..2a0a856 --- /dev/null +++ b/ryu/tests/unit/packet/test_icmp.py @@ -0,0 +1,63 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import icmp + + +LOG = logging.getLogger(__name__) + + +class Test_icmp_dest_unreach(unittest.TestCase): + + type_ = icmp.ICMP_DEST_UNREACH + code = icmp.ICMP_HOST_UNREACH_CODE + csum = 0 + + mtu = 10 + data = 'abc' + data_len = len(data) + dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data) + + ic = icmp.icmp(type_, code, csum, data=dst_unreach) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + + _data_str = '%s(data=\'%s\',data_len=%d,mtu=%d)' + data_str = _data_str % (icmp.dest_unreach.__name__, + self.data, + self.data_len, + self.mtu) + + _ic_str = ('%s(code=%d,csum=\'%s\',data=%s,type=%d)') + ic_str = _ic_str % (icmp.icmp.__name__, + self.code, + hex(self.csum), + data_str, + self.type_) + + eq_(str(self.ic), ic_str) + eq_(repr(self.ic), ic_str) diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py index 0a665c7..8768064 100644 --- a/ryu/tests/unit/packet/test_icmpv6.py +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -23,6 +23,8 @@ import netaddr from nose.tools import ok_, eq_, nottest, raises from nose.plugins.skip import Skip, SkipTest from ryu.ofproto import ether, inet +from ryu.lib.ip import ipv6_to_str +from ryu.lib.mac import haddr_to_str from ryu.lib.packet.ethernet import ethernet from ryu.lib.packet.packet import Packet from ryu.lib.packet import icmpv6 @@ -155,6 +157,26 @@ class Test_icmpv6_echo_request(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + def test_to_string(self): + echo = icmpv6.echo(self.id_, self.seq, self.data) + icmp = icmpv6.icmpv6(self.type_, self.code, 0, echo) + + _echo_str = '%s(data=%s,id=%d,seq=%d)' + echo_str = _echo_str % (icmpv6.echo.__name__, + repr(self.data), + self.id_, + self.seq) + + _icmp_str = '%s(code=%d,csum=\'%s\',data=%s,type_=%d)' + icmp_str = _icmp_str % (icmpv6.icmpv6.__name__, + self.code, + hex(icmp.csum), + echo_str, + self.type_) + + eq_(str(icmp), icmp_str) + eq_(repr(icmp), icmp_str) + class Test_icmpv6_echo_reply(Test_icmpv6_echo_request): def setUp(self): @@ -260,6 +282,35 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase): eq_(nd_length, self.nd_length) eq_(nd_hw_src, self.nd_hw_src) + def test_to_string(self): + nd_opt = icmpv6.nd_option_la(self.nd_hw_src) + nd = icmpv6.nd_neighbor( + self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd) + + _nd_opt_str = '%s(data=%s,hw_src=\'%s\')' + nd_opt_str = _nd_opt_str % (icmpv6.nd_option_la.__name__, + None, + haddr_to_str(self.nd_hw_src)) + + _nd_str = '%s(data=%s,dst=\'%s\',length=%d,res=%d,type_=%d)' + nd_str = _nd_str % (icmpv6.nd_neighbor.__name__, + nd_opt_str, + ipv6_to_str(self.dst), + self.nd_length, + nd.res, + self.nd_type) + + _icmp_str = '%s(code=%d,csum=\'%s\',data=%s,type_=%d)' + icmp_str = _icmp_str % (icmpv6.icmpv6.__name__, + self.code, + hex(icmp.csum), + nd_str, + self.type_) + + eq_(str(icmp), icmp_str) + eq_(repr(icmp), icmp_str) + class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict): def setUp(self): @@ -336,3 +387,16 @@ class Test_icmpv6_router_solict(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + + def test_to_string(self): + icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.data) + + _icmp_str = '%s(code=%d,csum=\'%s\',data=%s,type_=%d)' + icmp_str = _icmp_str % (icmpv6.icmpv6.__name__, + self.code, + hex(icmp.csum), + repr(self.data), + self.type_) + + eq_(str(icmp), icmp_str) + eq_(repr(icmp), icmp_str) diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py new file mode 100644 index 0000000..48dc3c9 --- /dev/null +++ b/ryu/tests/unit/packet/test_ipv6.py @@ -0,0 +1,64 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib.packet import ipv6 + + +LOG = logging.getLogger(__name__) + + +class Test_ipv6(unittest.TestCase): + + version = 6 + traffic_class = 0 + flow_label = 0 + payload_length = 817 + nxt = 6 + hop_limit = 128 + src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3') + dst = ip.ipv6_to_bin('2001:4860:0:2001::68') + + ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length, + nxt, hop_limit, src, dst) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + _ipv6_str = ('%s(dst=\'%s\',flow_label=%d,hop_limit=%d,nxt=%d,' + 'payload_length=%d,src=\'%s\',traffic_class=%d,' + 'version=%d)') + ipv6_str = _ipv6_str % (ipv6.ipv6.__name__, + ip.ipv6_to_str(self.dst), + self.flow_label, + self.hop_limit, + self.nxt, + self.payload_length, + ip.ipv6_to_str(self.src), + self.traffic_class, + self.version) + + eq_(str(self.ip), ipv6_str) + eq_(repr(self.ip), ipv6_str) diff --git a/ryu/tests/unit/packet/test_lldp.py b/ryu/tests/unit/packet/test_lldp.py index 0d22458..d903194 100644 --- a/ryu/tests/unit/packet/test_lldp.py +++ b/ryu/tests/unit/packet/test_lldp.py @@ -118,6 +118,55 @@ class TestLLDPMandatoryTLV(unittest.TestCase): pkt.serialize() eq_(pkt.data, self.data) + def test_to_string(self): + src = '\x00\x04\x96\x1f\xa7\x26' + tlv_chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id=src) + tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/3') + tlv_ttl = lldp.TTL(ttl=120) + tlv_end = lldp.End() + tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end) + lldp_pkt = lldp.lldp(tlvs) + + _chassis_id_str = '%s(chassis_id=%s,len=%d,subtype=%d,typelen=%d)' + tlv_chassis_id_str = _chassis_id_str % (lldp.ChassisID.__name__, + repr(src), + tlv_chassis_id.len, + lldp.ChassisID.SUB_MAC_ADDRESS, + tlv_chassis_id.typelen) + + _port_id_str = '%s(len=%d,port_id=\'%s\',subtype=%d,typelen=%d)' + tlv_port_id_str = _port_id_str % (lldp.PortID.__name__, + tlv_port_id.len, + tlv_port_id.port_id, + tlv_port_id.subtype, + tlv_port_id.typelen) + + _ttl_str = '%s(len=%d,ttl=%d,typelen=%d)' + tlv_ttl_str = _ttl_str % (lldp.TTL.__name__, + tlv_ttl.len, + tlv_ttl.ttl, + tlv_ttl.typelen) + + _end_str = '%s(len=%d,typelen=%d)' + tlv_end_str = _end_str % (lldp.End.__name__, + tlv_end.len, + tlv_end.typelen) + + _tlvs_str = '(%s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_end_str) + + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) + class TestLLDPOptionalTLV(unittest.TestCase): def setUp(self): @@ -257,3 +306,128 @@ class TestLLDPOptionalTLV(unittest.TestCase): # self.data has many organizationally specific TLVs data = str(pkt.data[:-2]) eq_(data, self.data[:len(data)]) + + def test_to_string(self): + src = '\x00\x01\x30\xf9\xad\xa0' + tlv_chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id=src) + tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/1') + tlv_ttl = lldp.TTL(ttl=120) + tlv_port_desc = lldp.PortDescription( + port_description='Summit300-48-Port 1001\x00') + tlv_sys_name = lldp.SystemName(system_name='Summit300-48\x00') + tlv_sys_description = lldp.SystemDescription( + system_description='Summit300-48 - Version 7.4e.1 (Build 5) ' + + 'by Release_Master 05/27/05 04:53:11\x00') + tlv_system_capabilities = lldp.SystemCapabilities( + subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT, + system_cap=0x14, + enabled_cap=0x14) + tlv_management_address = lldp.ManagementAddress( + addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0', + intf_subtype=0x02, intf_num=1001, + oid='') + tlv_organizationally_specific = lldp.OrganizationallySpecific( + oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00') + tlv_end = lldp.End() + tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_port_desc, + tlv_sys_name, tlv_sys_description, + tlv_system_capabilities, tlv_management_address, + tlv_organizationally_specific, tlv_end) + lldp_pkt = lldp.lldp(tlvs) + + _chassis_id_str = '%s(chassis_id=%s,len=%d,subtype=%d,typelen=%d)' + tlv_chassis_id_str = _chassis_id_str % (lldp.ChassisID.__name__, + repr(src), + tlv_chassis_id.len, + lldp.ChassisID.SUB_MAC_ADDRESS, + tlv_chassis_id.typelen) + + _port_id_str = '%s(len=%d,port_id=\'%s\',subtype=%d,typelen=%d)' + tlv_port_id_str = _port_id_str % (lldp.PortID.__name__, + tlv_port_id.len, + tlv_port_id.port_id, + tlv_port_id.subtype, + tlv_port_id.typelen) + + _ttl_str = '%s(len=%d,ttl=%d,typelen=%d)' + tlv_ttl_str = _ttl_str % (lldp.TTL.__name__, + tlv_ttl.len, + tlv_ttl.ttl, + tlv_ttl.typelen) + + _port_desc_str = '%s(len=%d,tlv_info=%s,typelen=%d)' + tlv_port_desc_str = _port_desc_str % (lldp.PortDescription.__name__, + tlv_port_desc.len, + repr(tlv_port_desc.tlv_info), + tlv_port_desc.typelen) + + _system_name_str = '%s(len=%d,tlv_info=%s,typelen=%d)' + tlv_system_name_str = _system_name_str % (lldp.SystemName.__name__, + tlv_sys_name.len, + repr(tlv_sys_name.tlv_info), + tlv_sys_name.typelen) + + _sys_desc_str = '%s(len=%d,tlv_info=%s,typelen=%d)' + tlv_sys_desc_str = _sys_desc_str % (lldp.SystemDescription.__name__, + tlv_sys_description.len, + repr(tlv_sys_description.tlv_info), + tlv_sys_description.typelen) + + _sys_cap_str = ('%s(enabled_cap=%d,len=%d,subtype=%d,' + 'system_cap=%d,typelen=%d)') + tlv_sys_cap_str = _sys_cap_str % (lldp.SystemCapabilities.__name__, + 0x14, + tlv_system_capabilities.len, + lldp.ChassisID.SUB_CHASSIS_COMPONENT, + 0x14, + tlv_system_capabilities.typelen) + + _man_addr_str = ('%s(addr=%s,addr_len=%d,addr_subtype=%d,' + 'intf_num=%d,intf_subtype=%d,len=%d,oid=\'%s\',' + 'oid_len=%d,typelen=%d)') + tlv_man_addr_str = _man_addr_str % (lldp.ManagementAddress.__name__, + repr('\x00\x01\x30\xf9\xad\xa0'), + tlv_management_address.addr_len, + 0x06, + 1001, + 0x02, + tlv_management_address.len, + '', + tlv_management_address.oid_len, + tlv_management_address.typelen) + + _org_spec_str = ('%s(info=%s,len=%d,oui=%s,subtype=%d,' + 'typelen=%d)') + tlv_org_spec_str = _org_spec_str % ( + lldp.OrganizationallySpecific.__name__, + repr('\x07\x01\x00'), + tlv_organizationally_specific.len, + repr('\x00\x12\x0f'), + 0x02, + tlv_organizationally_specific.typelen) + + _end_str = '%s(len=%d,typelen=%d)' + tlv_end_str = _end_str % (lldp.End.__name__, + tlv_end.len, + tlv_end.typelen) + + _tlvs_str = '(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_port_desc_str, + tlv_system_name_str, + tlv_sys_desc_str, + tlv_sys_cap_str, + tlv_man_addr_str, + tlv_org_spec_str, + tlv_end_str) + + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) diff --git a/ryu/tests/unit/packet/test_mpls.py b/ryu/tests/unit/packet/test_mpls.py new file mode 100644 index 0000000..9235ce6 --- /dev/null +++ b/ryu/tests/unit/packet/test_mpls.py @@ -0,0 +1,52 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import mpls + + +LOG = logging.getLogger(__name__) + + +class Test_mpls(unittest.TestCase): + + label = 29 + exp = 6 + bsb = 1 + ttl = 64 + mp = mpls.mpls(label, exp, bsb, ttl) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + + _mpls_str = '%s(bsb=%d,exp=%d,label=%d,ttl=%d)' + mpls_str = _mpls_str % (mpls.mpls.__name__, + self.bsb, + self.exp, + self.label, + self.ttl) + + eq_(str(self.mp), mpls_str) + eq_(repr(self.mp), mpls_str) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py index 6ddb05d..4f3acab 100644 --- a/ryu/tests/unit/packet/test_packet.py +++ b/ryu/tests/unit/packet/test_packet.py @@ -24,6 +24,7 @@ from nose.tools import * from nose.plugins.skip import Skip, SkipTest from ryu.ofproto import ether, inet from ryu.lib import mac +from ryu.lib.ip import ipv4_to_str from ryu.lib.packet import * @@ -115,6 +116,38 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + _eth_str = '%s(dst=\'%s\',ethertype=\'%s\',src=\'%s\')' + eth_str = _eth_str % (ethernet.ethernet.__name__, + mac.haddr_to_str(self.dst_mac), + '0x%04x' % ether.ETH_TYPE_ARP, + mac.haddr_to_str(self.src_mac)) + + _arp_str = ('%s(dst_ip=\'%s\',dst_mac=\'%s\',hlen=%d,' + 'hwtype=%d,opcode=%d,plen=%d,proto=\'%s\',' + 'src_ip=\'%s\',src_mac=\'%s\')') + arp_str = _arp_str % (arp.arp.__name__, + ipv4_to_str(self.dst_ip), + mac.haddr_to_str(self.dst_mac), + 6, + 1, + 2, + 4, + '0x%04x' % ether.ETH_TYPE_IP, + ipv4_to_str(self.src_ip), + mac.haddr_to_str(self.src_mac)) + + pkt_str = '%s, %s' % (eth_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_vlan_arp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -184,6 +217,48 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + _eth_str = '%s(dst=\'%s\',ethertype=\'%s\',src=\'%s\')' + eth_str = _eth_str % (ethernet.ethernet.__name__, + mac.haddr_to_str(self.dst_mac), + '0x%04x' % ether.ETH_TYPE_8021Q, + mac.haddr_to_str(self.src_mac)) + + _vlan_str = '%s(cfi=%d,ethertype=\'%s\',pcp=%d,vid=%d)' + vlan_str = _vlan_str % (vlan.vlan.__name__, + 0b1, + '0x%04x' % ether.ETH_TYPE_ARP, + 0b111, + 3) + + _arp_str = ('%s(dst_ip=\'%s\',dst_mac=\'%s\',hlen=%d,' + 'hwtype=%d,opcode=%d,plen=%d,proto=\'%s\',' + 'src_ip=\'%s\',src_mac=\'%s\')') + arp_str = _arp_str % (arp.arp.__name__, + ipv4_to_str(self.dst_ip), + mac.haddr_to_str(self.dst_mac), + 6, + 1, + 2, + 4, + '0x%04x' % ether.ETH_TYPE_IP, + ipv4_to_str(self.src_ip), + mac.haddr_to_str(self.src_mac)) + + pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(vlan_str, str(p_vlan)) + eq_(vlan_str, repr(p_vlan)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_udp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -271,6 +346,54 @@ class TestPacket(unittest.TestCase): ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + # to string + _eth_str = '%s(dst=\'%s\',ethertype=\'%s\',src=\'%s\')' + eth_str = _eth_str % (ethernet.ethernet.__name__, + mac.haddr_to_str(self.dst_mac), + '0x%04x' % ether.ETH_TYPE_IP, + mac.haddr_to_str(self.src_mac)) + + _ipv4_str = ('%s(csum=\'%s\',dst=\'%s\',flags=\'%s\',' + 'header_length=%d,identification=\'%s\',' + 'offset=%d,option=%s,proto=%d,src=\'%s\',' + 'tos=%d,total_length=%d,ttl=%d,version=%d)') + ipv4_str = _ipv4_str % (ipv4.ipv4.__name__, + hex(p_ipv4.csum), + ipv4_to_str(self.dst_ip), + '0x%02x' % 1, + 5, + hex(3), + 4, + None, + inet.IPPROTO_UDP, + ipv4_to_str(self.src_ip), + 1, + l, + 64, + 4) + + _udp_str = '%s(csum=\'%s\',dst_port=%d,src_port=%d,total_length=%d)' + udp_str = _udp_str % (udp.udp.__name__, + hex(0x77b2), + 0x1F90, + 0x190f, + len(u_buf) + len(self.payload)) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(udp_str, str(p_udp)) + eq_(udp_str, repr(p_udp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_tcp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -369,3 +492,59 @@ class TestPacket(unittest.TestCase): # payload ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + + # to string + _eth_str = '%s(dst=\'%s\',ethertype=\'%s\',src=\'%s\')' + eth_str = _eth_str % (ethernet.ethernet.__name__, + mac.haddr_to_str(self.dst_mac), + '0x%04x' % ether.ETH_TYPE_IP, + mac.haddr_to_str(self.src_mac)) + + _ipv4_str = ('%s(csum=\'%s\',dst=\'%s\',flags=\'%s\',' + 'header_length=%d,identification=\'%s\',' + 'offset=%d,option=%s,proto=%d,src=\'%s\',' + 'tos=%d,total_length=%d,ttl=%d,version=%d)') + ipv4_str = _ipv4_str % (ipv4.ipv4.__name__, + hex(p_ipv4.csum), + ipv4_to_str(self.dst_ip), + '0x%02x' % 0, + 5, + hex(0), + 0, + None, + inet.IPPROTO_TCP, + ipv4_to_str(self.src_ip), + 0, + l, + 64, + 4) + + _tcp_str = ('%s(ack=%d,bits=\'%s\',csum=\'%s\',' + 'dst_port=%d,offset=%d,option=%s,' + 'seq=%d,src_port=%d,urgent=%d,window_size=%d)') + tcp_str = _tcp_str % (tcp.tcp.__name__, + 1, + format(0b101010, '09b'), + hex(p_tcp.csum), + 0x1F90, + 6, + p_tcp.option, + 0x123, + 0x190f, + 0x6f, + 2048) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(tcp_str, str(p_tcp)) + eq_(tcp_str, repr(p_tcp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) diff --git a/ryu/tests/unit/packet/test_vrrp.py b/ryu/tests/unit/packet/test_vrrp.py index 08e90a2..05866a7 100644 --- a/ryu/tests/unit/packet/test_vrrp.py +++ b/ryu/tests/unit/packet/test_vrrp.py @@ -182,6 +182,28 @@ class Test_vrrpv2(unittest.TestCase): max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + _vrrpv2_str = ('%s(auth_data=%s,auth_type=%s,checksum=\'%s\',' + 'count_ip=%d,identification=%d,ip_addresses=\'%s\',' + 'max_adver_int=%d,priority=%d,type=%d,' + 'version=%d,vrid=%d)') + vrrpv2_str = _vrrpv2_str % (vrrp.vrrpv2.__name__, + repr(self.auth_data), + repr(self.auth_type), + hex(self.vrrpv2.checksum), + self.count_ip, + self.vrrpv2.identification, + self.vrrpv2._ip_addresses_to_str( + [self.ip_address]), + self.max_adver_int, + self.priority, + self.type_, + self.version, + self.vrid) + + eq_(str(self.vrrpv2), vrrpv2_str) + eq_(repr(self.vrrpv2), vrrpv2_str) + class Test_vrrpv3_ipv4(unittest.TestCase): """ Test case for vrrp v3 IPv4 @@ -323,6 +345,28 @@ class Test_vrrpv3_ipv4(unittest.TestCase): max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + _vrrpv3_str = ('%s(auth_data=%s,auth_type=%s,checksum=\'%s\',' + 'count_ip=%d,identification=%d,ip_addresses=\'%s\',' + 'max_adver_int=%d,priority=%d,type=%d,' + 'version=%d,vrid=%d)') + vrrpv3_str = _vrrpv3_str % (vrrp.vrrpv3.__name__, + None, + None, + hex(self.vrrpv3.checksum), + self.count_ip, + self.vrrpv3.identification, + self.vrrpv3._ip_addresses_to_str( + [self.ip_address]), + self.max_adver_int, + self.priority, + self.type_, + self.version, + self.vrid) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) + class Test_vrrpv3_ipv6(unittest.TestCase): """ Test case for vrrp v3 IPv6 @@ -422,3 +466,25 @@ class Test_vrrpv3_ipv6(unittest.TestCase): print(len(p0.data), p0.data) print(len(p1.data), p1.data) eq_(p0.data, p1.data) + + def test_to_string(self): + _vrrpv3_str = ('%s(auth_data=%s,auth_type=%s,checksum=\'%s\',' + 'count_ip=%d,identification=%d,ip_addresses=\'%s\',' + 'max_adver_int=%d,priority=%d,type=%d,' + 'version=%d,vrid=%d)') + vrrpv3_str = _vrrpv3_str % (vrrp.vrrpv3.__name__, + None, + None, + hex(self.vrrpv3.checksum), + self.count_ip, + self.vrrpv3.identification, + self.vrrpv3._ip_addresses_to_str( + [self.ip_address]), + self.max_adver_int, + self.priority, + self.type_, + self.version, + self.vrid) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) -- 1.7.10.4 ------------------------------------------------------------------------------ See everything from the browser to the database with AppDynamics Get end-to-end visibility with application monitoring from AppDynamics Isolate bottlenecks and diagnose root cause in seconds. Start your free trial of AppDynamics Pro today! http://pubads.g.doubleclick.net/gampad/clk?id=48808831&iu=/4140/ostg.clktrk _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
