Signed-off-by: WATANABE Fumitaka <[email protected]> --- ryu/tests/unit/packet/test_dhcp.py | 111 ++++++++++++++++++ ryu/tests/unit/packet/test_icmp.py | 67 +++++++++++ ryu/tests/unit/packet/test_icmpv6.py | 77 ++++++++++++ ryu/tests/unit/packet/test_ipv6.py | 65 +++++++++++ ryu/tests/unit/packet/test_lldp.py | 212 ++++++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_mpls.py | 54 +++++++++ ryu/tests/unit/packet/test_packet.py | 190 ++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_vrrp.py | 64 ++++++++++ 8 files changed, 840 insertions(+) create mode 100644 ryu/tests/unit/packet/test_dhcp.py create mode 100644 ryu/tests/unit/packet/test_icmp.py create mode 100644 ryu/tests/unit/packet/test_ipv6.py create mode 100644 ryu/tests/unit/packet/test_mpls.py
diff --git a/ryu/tests/unit/packet/test_dhcp.py b/ryu/tests/unit/packet/test_dhcp.py new file mode 100644 index 0000000..bd746d3 --- /dev/null +++ b/ryu/tests/unit/packet/test_dhcp.py @@ -0,0 +1,111 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import inspect +import logging +import socket +import unittest +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib import mac +from ryu.lib.packet import dhcp + + +LOG = logging.getLogger(__name__) + + +class Test_dhcp_offer(unittest.TestCase): + + op = dhcp.DHCP_BOOT_REPLY + chaddr = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA') + htype = 1 + hlen = 6 + hops = 0 + xid = 1 + secs = 0 + flags = 1 + ciaddr = ip.ipv4_to_bin('192.168.10.10') + yiaddr = ip.ipv4_to_bin('192.168.20.20') + siaddr = ip.ipv4_to_bin('192.168.30.30') + giaddr = ip.ipv4_to_bin('192.168.40.40') + sname = 'abc' + boot_file = '' + + option_list = [dhcp.option("35", "02"), + dhcp.option("01", "ffffff00"), + dhcp.option("03", "c0a80a09"), + dhcp.option("06", "c0a80a09"), + dhcp.option("33", "0003f480"), + dhcp.option("3a", "0001fa40"), + dhcp.option("3b", "000375f0"), + dhcp.option("36", "c0a80a09")] + magic_cookie = socket.inet_aton("99.130.83.99") + options = dhcp.options(option_list=option_list, + magic_cookie=magic_cookie) + + dh = dhcp.dhcp(op, chaddr, options, htype=htype, hlen=hlen, + hops=hops, xid=xid, secs=secs, flags=flags, + ciaddr=ciaddr, yiaddr=yiaddr, siaddr=siaddr, + giaddr=giaddr, sname=sname, boot_file=boot_file) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + option_values = ['tag', 'length', 'value'] + opt_str_list = [] + for option in self.option_list: + _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k))) + for k, v in inspect.getmembers(option) + if k in option_values]) + opt_str = '%s(%s)' % (dhcp.option.__name__, _opt_str) + opt_str_list.append(opt_str) + option_str = '[%s]' % ', '.join(opt_str_list) + + opts_vals = {'magic_cookie': repr(socket.inet_ntoa(self.magic_cookie)), + 'option_list': option_str, + 'options_len': repr(self.options.options_len)} + _options_str = ','.join(['%s=%s' % (k, opts_vals[k]) + for k, v in inspect.getmembers(self.options) + if k in opts_vals]) + options_str = '%s(%s)' % (dhcp.options.__name__, _options_str) + + dhcp_values = {'op': repr(self.op), + 'htype': repr(self.htype), + 'hlen': repr(self.hlen), + 'hops': repr(self.hops), + 'xid': repr('0x%x' % self.xid), + 'secs': repr(self.secs), + 'flags': repr(self.flags), + 'ciaddr': repr(ip.ipv4_to_str(self.ciaddr)), + 'yiaddr': repr(ip.ipv4_to_str(self.yiaddr)), + 'siaddr': repr(ip.ipv4_to_str(self.siaddr)), + 'giaddr': repr(ip.ipv4_to_str(self.giaddr)), + 'chaddr': repr(mac.haddr_to_str(self.chaddr)), + 'sname': repr(self.sname), + 'boot_file': repr(self.boot_file), + 'options': options_str} + _dh_str = ','.join(['%s=%s' % (k, dhcp_values[k]) + for k, v in inspect.getmembers(self.dh) + if k in dhcp_values]) + dh_str = '%s(%s)' % (dhcp.dhcp.__name__, _dh_str) + + eq_(str(self.dh), dh_str) + eq_(repr(self.dh), dh_str) diff --git a/ryu/tests/unit/packet/test_icmp.py b/ryu/tests/unit/packet/test_icmp.py new file mode 100644 index 0000000..98d1061 --- /dev/null +++ b/ryu/tests/unit/packet/test_icmp.py @@ -0,0 +1,67 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import inspect +import logging + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import icmp + + +LOG = logging.getLogger(__name__) + + +class Test_icmp_dest_unreach(unittest.TestCase): + + type_ = icmp.ICMP_DEST_UNREACH + code = icmp.ICMP_HOST_UNREACH_CODE + csum = 0 + + mtu = 10 + data = 'abc' + data_len = len(data) + dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data) + + ic = icmp.icmp(type_, code, csum, data=dst_unreach) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + data_values = {'data': self.data, + 'data_len': self.data_len, + 'mtu': self.mtu} + _data_str = ','.join(['%s=%s' % (k, repr(data_values[k])) + for k, v in inspect.getmembers(self.dst_unreach) + if k in data_values]) + data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str) + + icmp_values = {'type': repr(self.type_), + 'code': repr(self.code), + 'csum': repr('0x%x' % self.csum), + 'data': data_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(self.ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str) + + eq_(str(self.ic), ic_str) + eq_(repr(self.ic), ic_str) diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py index 0a665c7..d313fd4 100644 --- a/ryu/tests/unit/packet/test_icmpv6.py +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -19,10 +19,13 @@ import unittest import logging import struct import netaddr +import inspect from nose.tools import ok_, eq_, nottest, raises from nose.plugins.skip import Skip, SkipTest from ryu.ofproto import ether, inet +from ryu.lib.ip import ipv6_to_str +from ryu.lib.mac import haddr_to_str from ryu.lib.packet.ethernet import ethernet from ryu.lib.packet.packet import Packet from ryu.lib.packet import icmpv6 @@ -155,6 +158,30 @@ class Test_icmpv6_echo_request(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + def test_to_string(self): + ec = icmpv6.echo(self.id_, self.seq, self.data) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec) + + echo_values = {'id': self.id_, + 'seq': self.seq, + 'data': self.data} + _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k])) + for k, v in inspect.getmembers(ec) + if k in echo_values]) + echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr('0x%x' % self.csum), + 'data': echo_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + class Test_icmpv6_echo_reply(Test_icmpv6_echo_request): def setUp(self): @@ -260,6 +287,41 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase): eq_(nd_length, self.nd_length) eq_(nd_hw_src, self.nd_hw_src) + def test_to_string(self): + nd_opt = icmpv6.nd_option_la(self.nd_hw_src) + nd = icmpv6.nd_neighbor( + self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd) + + nd_opt_values = {'hw_src': haddr_to_str(self.nd_hw_src), + 'data': None} + _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k])) + for k, v in inspect.getmembers(nd_opt) + if k in nd_opt_values]) + nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str) + + nd_values = {'res': repr(nd.res), + 'dst': repr(ipv6_to_str(self.dst)), + 'type_': repr(self.nd_type), + 'length': repr(self.nd_length), + 'data': nd_opt_str} + _nd_str = ','.join(['%s=%s' % (k, nd_values[k]) + for k, v in inspect.getmembers(nd) + if k in nd_values]) + nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr('0x%x' % self.csum), + 'data': nd_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict): def setUp(self): @@ -336,3 +398,18 @@ class Test_icmpv6_router_solict(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + + def test_to_string(self): + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.data) + + icmp_values = {'type_': self.type_, + 'code': self.code, + 'csum': '0x%x' % self.csum, + 'data': self.data} + _ic_str = ','.join(['%s=%s' % (k, repr(icmp_values[k])) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py new file mode 100644 index 0000000..9d86087 --- /dev/null +++ b/ryu/tests/unit/packet/test_ipv6.py @@ -0,0 +1,65 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib.packet import ipv6 + + +LOG = logging.getLogger(__name__) + + +class Test_ipv6(unittest.TestCase): + + version = 6 + traffic_class = 0 + flow_label = 0 + payload_length = 817 + nxt = 6 + hop_limit = 128 + src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3') + dst = ip.ipv6_to_bin('2001:4860:0:2001::68') + + ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length, + nxt, hop_limit, src, dst) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + ipv6_values = {'version': self.version, + 'traffic_class': self.traffic_class, + 'flow_label': self.flow_label, + 'payload_length': self.payload_length, + 'nxt': self.nxt, + 'hop_limit': self.hop_limit, + 'src': ip.ipv6_to_str(self.src), + 'dst': ip.ipv6_to_str(self.dst)} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, v in inspect.getmembers(self.ip) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + eq_(str(self.ip), ipv6_str) + eq_(repr(self.ip), ipv6_str) diff --git a/ryu/tests/unit/packet/test_lldp.py b/ryu/tests/unit/packet/test_lldp.py index 0d22458..d0ca426 100644 --- a/ryu/tests/unit/packet/test_lldp.py +++ b/ryu/tests/unit/packet/test_lldp.py @@ -18,6 +18,7 @@ import unittest import logging import struct +import inspect from nose.tools import ok_, eq_, nottest from ryu.ofproto import ether @@ -118,6 +119,62 @@ class TestLLDPMandatoryTLV(unittest.TestCase): pkt.serialize() eq_(pkt.data, self.data) + def test_to_string(self): + chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id='\x00\x04\x96\x1f\xa7\x26') + port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/3') + ttl = lldp.TTL(ttl=120) + end = lldp.End() + tlvs = (chassis_id, port_id, ttl, end) + lldp_pkt = lldp.lldp(tlvs) + + chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS, + 'chassis_id': '\x00\x04\x96\x1f\xa7\x26', + 'len': chassis_id.len, + 'typelen': chassis_id.typelen} + _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k])) + for k, v in inspect.getmembers(chassis_id) + if k in chassis_id_values]) + tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str) + + port_id_values = {'subtype': port_id.subtype, + 'port_id': port_id.port_id, + 'len': port_id.len, + 'typelen': port_id.typelen} + _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k])) + for k, v in inspect.getmembers(port_id) + if k in port_id_values]) + tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str) + + ttl_values = {'ttl': ttl.ttl, + 'len': ttl.len, + 'typelen': ttl.typelen} + _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k])) + for k, v in inspect.getmembers(ttl) + if k in ttl_values]) + tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str) + + end_values = {'len': end.len, + 'typelen': end.typelen} + _end_str = ','.join(['%s=%s' % (k, repr(end_values[k])) + for k, v in inspect.getmembers(end) + if k in end_values]) + tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str) + + _tlvs_str = '(%s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_end_str) + + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) + class TestLLDPOptionalTLV(unittest.TestCase): def setUp(self): @@ -257,3 +314,158 @@ class TestLLDPOptionalTLV(unittest.TestCase): # self.data has many organizationally specific TLVs data = str(pkt.data[:-2]) eq_(data, self.data[:len(data)]) + + def test_to_string(self): + chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id='\x00\x01\x30\xf9\xad\xa0') + port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/1') + ttl = lldp.TTL(ttl=120) + port_desc = lldp.PortDescription( + port_description='Summit300-48-Port 1001\x00') + sys_name = lldp.SystemName(system_name='Summit300-48\x00') + sys_desc = lldp.SystemDescription( + system_description='Summit300-48 - Version 7.4e.1 (Build 5) ' + + 'by Release_Master 05/27/05 04:53:11\x00') + sys_cap = lldp.SystemCapabilities( + subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT, + system_cap=0x14, + enabled_cap=0x14) + man_addr = lldp.ManagementAddress( + addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0', + intf_subtype=0x02, intf_num=1001, + oid='') + org_spec = lldp.OrganizationallySpecific( + oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00') + end = lldp.End() + tlvs = (chassis_id, port_id, ttl, port_desc, sys_name, + sys_desc, sys_cap, man_addr, org_spec, end) + lldp_pkt = lldp.lldp(tlvs) + + # ChassisID string + chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS, + 'chassis_id': '\x00\x01\x30\xf9\xad\xa0', + 'len': chassis_id.len, + 'typelen': chassis_id.typelen} + _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k])) + for k, v in inspect.getmembers(chassis_id) + if k in chassis_id_values]) + tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str) + + # PortID string + port_id_values = {'subtype': port_id.subtype, + 'port_id': port_id.port_id, + 'len': port_id.len, + 'typelen': port_id.typelen} + _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k])) + for k, v in inspect.getmembers(port_id) + if k in port_id_values]) + tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str) + + # TTL string + ttl_values = {'ttl': ttl.ttl, + 'len': ttl.len, + 'typelen': ttl.typelen} + _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k])) + for k, v in inspect.getmembers(ttl) + if k in ttl_values]) + tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str) + + # PortDescription string + port_desc_values = {'tlv_info': port_desc.tlv_info, + 'len': port_desc.len, + 'typelen': port_desc.typelen} + _port_desc_str = ','.join(['%s=%s' % (k, repr(port_desc_values[k])) + for k, v in inspect.getmembers(port_desc) + if k in port_desc_values]) + tlv_port_desc_str = '%s(%s)' % (lldp.PortDescription.__name__, + _port_desc_str) + + # SystemName string + sys_name_values = {'tlv_info': sys_name.tlv_info, + 'len': sys_name.len, + 'typelen': sys_name.typelen} + _system_name_str = ','.join(['%s=%s' % (k, repr(sys_name_values[k])) + for k, v in inspect.getmembers(sys_name) + if k in sys_name_values]) + tlv_system_name_str = '%s(%s)' % (lldp.SystemName.__name__, + _system_name_str) + + # SystemDescription string + sys_desc_values = {'tlv_info': sys_desc.tlv_info, + 'len': sys_desc.len, + 'typelen': sys_desc.typelen} + _sys_desc_str = ','.join(['%s=%s' % (k, repr(sys_desc_values[k])) + for k, v in inspect.getmembers(sys_desc) + if k in sys_desc_values]) + tlv_sys_desc_str = '%s(%s)' % (lldp.SystemDescription.__name__, + _sys_desc_str) + + # SystemCapabilities string + sys_cap_values = {'subtype': lldp.ChassisID.SUB_CHASSIS_COMPONENT, + 'system_cap': 0x14, + 'enabled_cap': 0x14, + 'len': sys_cap.len, + 'typelen': sys_cap.typelen} + _sys_cap_str = ','.join(['%s=%s' % (k, repr(sys_cap_values[k])) + for k, v in inspect.getmembers(sys_cap) + if k in sys_cap_values]) + tlv_sys_cap_str = '%s(%s)' % (lldp.SystemCapabilities.__name__, + _sys_cap_str) + + # ManagementAddress string + man_addr_values = {'addr_subtype': 0x06, + 'addr': '\x00\x01\x30\xf9\xad\xa0', + 'addr_len': man_addr.addr_len, + 'intf_subtype': 0x02, + 'intf_num': 1001, + 'oid': '', + 'oid_len': man_addr.oid_len, + 'len': man_addr.len, + 'typelen': man_addr.typelen} + _man_addr_str = ','.join(['%s=%s' % (k, repr(man_addr_values[k])) + for k, v in inspect.getmembers(man_addr) + if k in man_addr_values]) + tlv_man_addr_str = '%s(%s)' % (lldp.ManagementAddress.__name__, + _man_addr_str) + + # OrganizationallySpecific string + org_spec_values = {'oui': '\x00\x12\x0f', + 'subtype': 0x02, + 'info': '\x07\x01\x00', + 'len': org_spec.len, + 'typelen': org_spec.typelen} + _org_spec_str = ','.join(['%s=%s' % (k, repr(org_spec_values[k])) + for k, v in inspect.getmembers(org_spec) + if k in org_spec_values]) + tlv_org_spec_str = '%s(%s)' % (lldp.OrganizationallySpecific.__name__, + _org_spec_str) + + # End string + end_values = {'len': end.len, + 'typelen': end.typelen} + _end_str = ','.join(['%s=%s' % (k, repr(end_values[k])) + for k, v in inspect.getmembers(end) + if k in end_values]) + tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str) + + # tlvs string + _tlvs_str = '(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_port_desc_str, + tlv_system_name_str, + tlv_sys_desc_str, + tlv_sys_cap_str, + tlv_man_addr_str, + tlv_org_spec_str, + tlv_end_str) + + # lldp string + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) diff --git a/ryu/tests/unit/packet/test_mpls.py b/ryu/tests/unit/packet/test_mpls.py new file mode 100644 index 0000000..abde030 --- /dev/null +++ b/ryu/tests/unit/packet/test_mpls.py @@ -0,0 +1,54 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import mpls + + +LOG = logging.getLogger(__name__) + + +class Test_mpls(unittest.TestCase): + + label = 29 + exp = 6 + bsb = 1 + ttl = 64 + mp = mpls.mpls(label, exp, bsb, ttl) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + mpls_values = {'label': self.label, + 'exp': self.exp, + 'bsb': self.bsb, + 'ttl': self.ttl} + _mpls_str = ','.join(['%s=%s' % (k, repr(mpls_values[k])) + for k, v in inspect.getmembers(self.mp) + if k in mpls_values]) + mpls_str = '%s(%s)' % (mpls.mpls.__name__, _mpls_str) + + eq_(str(self.mp), mpls_str) + eq_(repr(self.mp), mpls_str) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py index 8594002..fcaf099 100644 --- a/ryu/tests/unit/packet/test_packet.py +++ b/ryu/tests/unit/packet/test_packet.py @@ -20,10 +20,12 @@ import logging import struct import netaddr import array +import inspect from nose.tools import * from nose.plugins.skip import Skip, SkipTest from ryu.ofproto import ether, inet from ryu.lib import mac +from ryu.lib.ip import ipv4_to_str from ryu.lib.packet import * @@ -113,6 +115,40 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + eth_values = {'dst': mac.haddr_to_str(self.dst_mac), + 'src': mac.haddr_to_str(self.src_mac), + 'ethertype': '0x%04x' % ether.ETH_TYPE_ARP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + arp_values = {'hwtype': 1, + 'proto': '0x%04x' % ether.ETH_TYPE_IP, + 'hlen': 6, + 'plen': 4, + 'opcode': 2, + 'src_mac': mac.haddr_to_str(self.src_mac), + 'dst_mac': mac.haddr_to_str(self.dst_mac), + 'src_ip': ipv4_to_str(self.src_ip), + 'dst_ip': ipv4_to_str(self.dst_ip)} + _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k])) + for k, v in inspect.getmembers(p_arp) + if k in arp_values]) + arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str) + + pkt_str = '%s, %s' % (eth_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_vlan_arp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -182,6 +218,52 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + eth_values = {'dst': mac.haddr_to_str(self.dst_mac), + 'src': mac.haddr_to_str(self.src_mac), + 'ethertype': '0x%04x' % ether.ETH_TYPE_8021Q} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + vlan_values = {'pcp': 0b111, + 'cfi': 0b1, + 'vid': 3, + 'ethertype': '0x%04x' % ether.ETH_TYPE_ARP} + _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k])) + for k, v in inspect.getmembers(p_vlan) + if k in vlan_values]) + vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str) + + arp_values = {'hwtype': 1, + 'proto': '0x%04x' % ether.ETH_TYPE_IP, + 'hlen': 6, + 'plen': 4, + 'opcode': 2, + 'src_mac': mac.haddr_to_str(self.src_mac), + 'dst_mac': mac.haddr_to_str(self.dst_mac), + 'src_ip': ipv4_to_str(self.src_ip), + 'dst_ip': ipv4_to_str(self.dst_ip)} + _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k])) + for k, v in inspect.getmembers(p_arp) + if k in arp_values]) + arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str) + + pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(vlan_str, str(p_vlan)) + eq_(vlan_str, repr(p_vlan)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_udp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -269,6 +351,57 @@ class TestPacket(unittest.TestCase): ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + # to string + eth_values = {'dst': mac.haddr_to_str(self.dst_mac), + 'src': mac.haddr_to_str(self.src_mac), + 'ethertype': '0x%04x' % ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 1, + 'total_length': l, + 'identification': '0x%x' % 3, + 'flags': '0x%02x' % 1, + 'offset': p_ipv4.offset, + 'ttl': 64, + 'proto': inet.IPPROTO_UDP, + 'csum': '0x%x' % p_ipv4.csum, + 'src': ipv4_to_str(self.src_ip), + 'dst': ipv4_to_str(self.dst_ip), + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + udp_values = {'src_port': 0x190f, + 'dst_port': 0x1F90, + 'total_length': len(u_buf) + len(self.payload), + 'csum': '0x%x' % 0x77b2} + _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k])) + for k, v in inspect.getmembers(p_udp) + if k in udp_values]) + udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(udp_str, str(p_udp)) + eq_(udp_str, repr(p_udp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_tcp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -367,3 +500,60 @@ class TestPacket(unittest.TestCase): # payload ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + + # to string + eth_values = {'dst': mac.haddr_to_str(self.dst_mac), + 'src': mac.haddr_to_str(self.src_mac), + 'ethertype': '0x%04x' % ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 0, + 'total_length': l, + 'identification': '0x%x' % 0, + 'flags': '0x%02x' % 0, + 'offset': p_ipv4.offset, + 'ttl': 64, + 'proto': inet.IPPROTO_TCP, + 'csum': '0x%x' % p_ipv4.csum, + 'src': ipv4_to_str(self.src_ip), + 'dst': ipv4_to_str(self.dst_ip), + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + tcp_values = {'src_port': 0x190f, + 'dst_port': 0x1F90, + 'seq': 0x123, + 'ack': 1, + 'offset': 6, + 'bits': format(0b101010, '09b'), + 'window_size': 2048, + 'csum': '0x%x' % p_tcp.csum, + 'urgent': 0x6f, + 'option': p_tcp.option} + _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k])) + for k, v in inspect.getmembers(p_tcp) + if k in tcp_values]) + tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(tcp_str, str(p_tcp)) + eq_(tcp_str, repr(p_tcp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) diff --git a/ryu/tests/unit/packet/test_vrrp.py b/ryu/tests/unit/packet/test_vrrp.py index 56bb092..83490a3 100644 --- a/ryu/tests/unit/packet/test_vrrp.py +++ b/ryu/tests/unit/packet/test_vrrp.py @@ -20,6 +20,7 @@ import unittest import logging import struct import netaddr +import inspect from nose.tools import eq_, ok_ from nose.tools import raises @@ -182,6 +183,27 @@ class Test_vrrpv2(unittest.TestCase): max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + vrrpv2_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': '0x%x' % self.vrrpv2.checksum, + 'ip_addresses': vrrp.ip_addresses_to_str( + [self.ip_address]), + 'auth_type': self.auth_type, + 'auth_data': self.auth_data, + 'identification': self.vrrpv2.identification} + _vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k])) + for k, v in inspect.getmembers(self.vrrpv2) + if k in vrrpv2_values]) + vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str) + + eq_(str(self.vrrpv2), vrrpv2_str) + eq_(repr(self.vrrpv2), vrrpv2_str) + class Test_vrrpv3_ipv4(unittest.TestCase): """ Test case for vrrp v3 IPv4 @@ -324,6 +346,27 @@ class Test_vrrpv3_ipv4(unittest.TestCase): max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + vrrpv3_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': '0x%x' % self.vrrpv3.checksum, + 'ip_addresses': vrrp.ip_addresses_to_str( + [self.ip_address]), + 'auth_type': None, + 'auth_data': None, + 'identification': self.vrrpv3.identification} + _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k])) + for k, v in inspect.getmembers(self.vrrpv3) + if k in vrrpv3_values]) + vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) + class Test_vrrpv3_ipv6(unittest.TestCase): """ Test case for vrrp v3 IPv6 @@ -423,3 +466,24 @@ class Test_vrrpv3_ipv6(unittest.TestCase): print(len(p0.data), p0.data) print(len(p1.data), p1.data) eq_(p0.data, p1.data) + + def test_to_string(self): + vrrpv3_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': '0x%x' % self.vrrpv3.checksum, + 'ip_addresses': vrrp.ip_addresses_to_str( + [self.ip_address]), + 'auth_type': None, + 'auth_data': None, + 'identification': self.vrrpv3.identification} + _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k])) + for k, v in inspect.getmembers(self.vrrpv3) + if k in vrrpv3_values]) + vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) -- 1.7.10.4 ------------------------------------------------------------------------------ See everything from the browser to the database with AppDynamics Get end-to-end visibility with application monitoring from AppDynamics Isolate bottlenecks and diagnose root cause in seconds. Start your free trial of AppDynamics Pro today! http://pubads.g.doubleclick.net/gampad/clk?id=48808831&iu=/4140/ostg.clktrk _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
