Signed-off-by: WATANABE Fumitaka <[email protected]> --- ryu/tests/unit/packet/test_dhcp.py | 111 +++++++++++++++ ryu/tests/unit/packet/test_icmp.py | 67 +++++++++ ryu/tests/unit/packet/test_icmpv6.py | 75 ++++++++++ ryu/tests/unit/packet/test_ipv6.py | 65 +++++++++ ryu/tests/unit/packet/test_lldp.py | 212 ++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_mpls.py | 54 ++++++++ ryu/tests/unit/packet/test_packet.py | 250 ++++++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_vrrp.py | 61 +++++++++ 8 files changed, 895 insertions(+) create mode 100644 ryu/tests/unit/packet/test_dhcp.py create mode 100644 ryu/tests/unit/packet/test_icmp.py create mode 100644 ryu/tests/unit/packet/test_ipv6.py create mode 100644 ryu/tests/unit/packet/test_mpls.py
diff --git a/ryu/tests/unit/packet/test_dhcp.py b/ryu/tests/unit/packet/test_dhcp.py new file mode 100644 index 0000000..768b233 --- /dev/null +++ b/ryu/tests/unit/packet/test_dhcp.py @@ -0,0 +1,111 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import inspect +import logging +import socket +import unittest +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib import mac +from ryu.lib.packet import dhcp + + +LOG = logging.getLogger(__name__) + + +class Test_dhcp_offer(unittest.TestCase): + + op = dhcp.DHCP_BOOT_REPLY + chaddr = 'AA:AA:AA:AA:AA:AA' + htype = 1 + hlen = 6 + hops = 0 + xid = 1 + secs = 0 + flags = 1 + ciaddr = '192.168.10.10' + yiaddr = '192.168.20.20' + siaddr = '192.168.30.30' + giaddr = '192.168.40.40' + sname = 'abc' + boot_file = '' + + option_list = [dhcp.option('35', '02'), + dhcp.option('01', 'ffffff00'), + dhcp.option('03', 'c0a80a09'), + dhcp.option('06', 'c0a80a09'), + dhcp.option('33', '0003f480'), + dhcp.option('3a', '0001fa40'), + dhcp.option('3b', '000375f0'), + dhcp.option('36', 'c0a80a09')] + magic_cookie = socket.inet_aton('99.130.83.99') + options = dhcp.options(option_list=option_list, + magic_cookie=magic_cookie) + + dh = dhcp.dhcp(op, chaddr, options, htype=htype, hlen=hlen, + hops=hops, xid=xid, secs=secs, flags=flags, + ciaddr=ciaddr, yiaddr=yiaddr, siaddr=siaddr, + giaddr=giaddr, sname=sname, boot_file=boot_file) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + option_values = ['tag', 'length', 'value'] + opt_str_list = [] + for option in self.option_list: + _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k))) + for k, v in inspect.getmembers(option) + if k in option_values]) + opt_str = '%s(%s)' % (dhcp.option.__name__, _opt_str) + opt_str_list.append(opt_str) + option_str = '[%s]' % ', '.join(opt_str_list) + + opts_vals = {'magic_cookie': repr(self.magic_cookie), + 'option_list': option_str, + 'options_len': repr(self.options.options_len)} + _options_str = ','.join(['%s=%s' % (k, opts_vals[k]) + for k, v in inspect.getmembers(self.options) + if k in opts_vals]) + options_str = '%s(%s)' % (dhcp.options.__name__, _options_str) + + dhcp_values = {'op': repr(self.op), + 'htype': repr(self.htype), + 'hlen': repr(self.hlen), + 'hops': repr(self.hops), + 'xid': repr(self.xid), + 'secs': repr(self.secs), + 'flags': repr(self.flags), + 'ciaddr': repr(self.ciaddr), + 'yiaddr': repr(self.yiaddr), + 'siaddr': repr(self.siaddr), + 'giaddr': repr(self.giaddr), + 'chaddr': repr(self.chaddr), + 'sname': repr(self.sname), + 'boot_file': repr(self.boot_file), + 'options': options_str} + _dh_str = ','.join(['%s=%s' % (k, dhcp_values[k]) + for k, v in inspect.getmembers(self.dh) + if k in dhcp_values]) + dh_str = '%s(%s)' % (dhcp.dhcp.__name__, _dh_str) + + eq_(str(self.dh), dh_str) + eq_(repr(self.dh), dh_str) diff --git a/ryu/tests/unit/packet/test_icmp.py b/ryu/tests/unit/packet/test_icmp.py new file mode 100644 index 0000000..75d9702 --- /dev/null +++ b/ryu/tests/unit/packet/test_icmp.py @@ -0,0 +1,67 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import inspect +import logging + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import icmp + + +LOG = logging.getLogger(__name__) + + +class Test_icmp_dest_unreach(unittest.TestCase): + + type_ = icmp.ICMP_DEST_UNREACH + code = icmp.ICMP_HOST_UNREACH_CODE + csum = 0 + + mtu = 10 + data = 'abc' + data_len = len(data) + dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data) + + ic = icmp.icmp(type_, code, csum, data=dst_unreach) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + data_values = {'data': self.data, + 'data_len': self.data_len, + 'mtu': self.mtu} + _data_str = ','.join(['%s=%s' % (k, repr(data_values[k])) + for k, v in inspect.getmembers(self.dst_unreach) + if k in data_values]) + data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str) + + icmp_values = {'type': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': data_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(self.ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str) + + eq_(str(self.ic), ic_str) + eq_(repr(self.ic), ic_str) diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py index 87e1696..848ecf9 100644 --- a/ryu/tests/unit/packet/test_icmpv6.py +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -18,6 +18,7 @@ import unittest import logging import struct +import inspect from nose.tools import ok_, eq_, nottest, raises from nose.plugins.skip import Skip, SkipTest @@ -157,6 +158,30 @@ class Test_icmpv6_echo_request(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + def test_to_string(self): + ec = icmpv6.echo(self.id_, self.seq, self.data) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec) + + echo_values = {'id': self.id_, + 'seq': self.seq, + 'data': self.data} + _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k])) + for k, v in inspect.getmembers(ec) + if k in echo_values]) + echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': echo_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + class Test_icmpv6_echo_reply(Test_icmpv6_echo_request): def setUp(self): @@ -263,6 +288,41 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase): eq_(nd_length, self.nd_length) eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src)) + def test_to_string(self): + nd_opt = icmpv6.nd_option_la(self.nd_hw_src) + nd = icmpv6.nd_neighbor( + self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd) + + nd_opt_values = {'hw_src': self.nd_hw_src, + 'data': None} + _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k])) + for k, v in inspect.getmembers(nd_opt) + if k in nd_opt_values]) + nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str) + + nd_values = {'res': repr(nd.res), + 'dst': repr(self.dst), + 'type_': repr(self.nd_type), + 'length': repr(self.nd_length), + 'data': nd_opt_str} + _nd_str = ','.join(['%s=%s' % (k, nd_values[k]) + for k, v in inspect.getmembers(nd) + if k in nd_values]) + nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': nd_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict): def setUp(self): @@ -339,3 +399,18 @@ class Test_icmpv6_router_solict(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + + def test_to_string(self): + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.data) + + icmp_values = {'type_': self.type_, + 'code': self.code, + 'csum': self.csum, + 'data': self.data} + _ic_str = ','.join(['%s=%s' % (k, repr(icmp_values[k])) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py new file mode 100644 index 0000000..5454fbf --- /dev/null +++ b/ryu/tests/unit/packet/test_ipv6.py @@ -0,0 +1,65 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib.packet import ipv6 + + +LOG = logging.getLogger(__name__) + + +class Test_ipv6(unittest.TestCase): + + version = 6 + traffic_class = 0 + flow_label = 0 + payload_length = 817 + nxt = 6 + hop_limit = 128 + src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3') + dst = ip.ipv6_to_bin('2001:4860:0:2001::68') + + ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length, + nxt, hop_limit, src, dst) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + ipv6_values = {'version': self.version, + 'traffic_class': self.traffic_class, + 'flow_label': self.flow_label, + 'payload_length': self.payload_length, + 'nxt': self.nxt, + 'hop_limit': self.hop_limit, + 'src': self.src, + 'dst': self.dst} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, v in inspect.getmembers(self.ip) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + eq_(str(self.ip), ipv6_str) + eq_(repr(self.ip), ipv6_str) diff --git a/ryu/tests/unit/packet/test_lldp.py b/ryu/tests/unit/packet/test_lldp.py index 7f95fb4..60e93b0 100644 --- a/ryu/tests/unit/packet/test_lldp.py +++ b/ryu/tests/unit/packet/test_lldp.py @@ -18,6 +18,7 @@ import unittest import logging import struct +import inspect from nose.tools import ok_, eq_, nottest from ryu.ofproto import ether @@ -120,6 +121,62 @@ class TestLLDPMandatoryTLV(unittest.TestCase): pkt.serialize() eq_(pkt.data, self.data) + def test_to_string(self): + chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id='\x00\x04\x96\x1f\xa7\x26') + port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/3') + ttl = lldp.TTL(ttl=120) + end = lldp.End() + tlvs = (chassis_id, port_id, ttl, end) + lldp_pkt = lldp.lldp(tlvs) + + chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS, + 'chassis_id': '\x00\x04\x96\x1f\xa7\x26', + 'len': chassis_id.len, + 'typelen': chassis_id.typelen} + _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k])) + for k, v in inspect.getmembers(chassis_id) + if k in chassis_id_values]) + tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str) + + port_id_values = {'subtype': port_id.subtype, + 'port_id': port_id.port_id, + 'len': port_id.len, + 'typelen': port_id.typelen} + _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k])) + for k, v in inspect.getmembers(port_id) + if k in port_id_values]) + tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str) + + ttl_values = {'ttl': ttl.ttl, + 'len': ttl.len, + 'typelen': ttl.typelen} + _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k])) + for k, v in inspect.getmembers(ttl) + if k in ttl_values]) + tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str) + + end_values = {'len': end.len, + 'typelen': end.typelen} + _end_str = ','.join(['%s=%s' % (k, repr(end_values[k])) + for k, v in inspect.getmembers(end) + if k in end_values]) + tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str) + + _tlvs_str = '(%s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_end_str) + + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) + class TestLLDPOptionalTLV(unittest.TestCase): def setUp(self): @@ -260,3 +317,158 @@ class TestLLDPOptionalTLV(unittest.TestCase): # self.data has many organizationally specific TLVs data = str(pkt.data[:-2]) eq_(data, self.data[:len(data)]) + + def test_to_string(self): + chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id='\x00\x01\x30\xf9\xad\xa0') + port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/1') + ttl = lldp.TTL(ttl=120) + port_desc = lldp.PortDescription( + port_description='Summit300-48-Port 1001\x00') + sys_name = lldp.SystemName(system_name='Summit300-48\x00') + sys_desc = lldp.SystemDescription( + system_description='Summit300-48 - Version 7.4e.1 (Build 5) ' + + 'by Release_Master 05/27/05 04:53:11\x00') + sys_cap = lldp.SystemCapabilities( + subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT, + system_cap=0x14, + enabled_cap=0x14) + man_addr = lldp.ManagementAddress( + addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0', + intf_subtype=0x02, intf_num=1001, + oid='') + org_spec = lldp.OrganizationallySpecific( + oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00') + end = lldp.End() + tlvs = (chassis_id, port_id, ttl, port_desc, sys_name, + sys_desc, sys_cap, man_addr, org_spec, end) + lldp_pkt = lldp.lldp(tlvs) + + # ChassisID string + chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS, + 'chassis_id': '\x00\x01\x30\xf9\xad\xa0', + 'len': chassis_id.len, + 'typelen': chassis_id.typelen} + _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k])) + for k, v in inspect.getmembers(chassis_id) + if k in chassis_id_values]) + tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str) + + # PortID string + port_id_values = {'subtype': port_id.subtype, + 'port_id': port_id.port_id, + 'len': port_id.len, + 'typelen': port_id.typelen} + _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k])) + for k, v in inspect.getmembers(port_id) + if k in port_id_values]) + tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str) + + # TTL string + ttl_values = {'ttl': ttl.ttl, + 'len': ttl.len, + 'typelen': ttl.typelen} + _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k])) + for k, v in inspect.getmembers(ttl) + if k in ttl_values]) + tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str) + + # PortDescription string + port_desc_values = {'tlv_info': port_desc.tlv_info, + 'len': port_desc.len, + 'typelen': port_desc.typelen} + _port_desc_str = ','.join(['%s=%s' % (k, repr(port_desc_values[k])) + for k, v in inspect.getmembers(port_desc) + if k in port_desc_values]) + tlv_port_desc_str = '%s(%s)' % (lldp.PortDescription.__name__, + _port_desc_str) + + # SystemName string + sys_name_values = {'tlv_info': sys_name.tlv_info, + 'len': sys_name.len, + 'typelen': sys_name.typelen} + _system_name_str = ','.join(['%s=%s' % (k, repr(sys_name_values[k])) + for k, v in inspect.getmembers(sys_name) + if k in sys_name_values]) + tlv_system_name_str = '%s(%s)' % (lldp.SystemName.__name__, + _system_name_str) + + # SystemDescription string + sys_desc_values = {'tlv_info': sys_desc.tlv_info, + 'len': sys_desc.len, + 'typelen': sys_desc.typelen} + _sys_desc_str = ','.join(['%s=%s' % (k, repr(sys_desc_values[k])) + for k, v in inspect.getmembers(sys_desc) + if k in sys_desc_values]) + tlv_sys_desc_str = '%s(%s)' % (lldp.SystemDescription.__name__, + _sys_desc_str) + + # SystemCapabilities string + sys_cap_values = {'subtype': lldp.ChassisID.SUB_CHASSIS_COMPONENT, + 'system_cap': 0x14, + 'enabled_cap': 0x14, + 'len': sys_cap.len, + 'typelen': sys_cap.typelen} + _sys_cap_str = ','.join(['%s=%s' % (k, repr(sys_cap_values[k])) + for k, v in inspect.getmembers(sys_cap) + if k in sys_cap_values]) + tlv_sys_cap_str = '%s(%s)' % (lldp.SystemCapabilities.__name__, + _sys_cap_str) + + # ManagementAddress string + man_addr_values = {'addr_subtype': 0x06, + 'addr': '\x00\x01\x30\xf9\xad\xa0', + 'addr_len': man_addr.addr_len, + 'intf_subtype': 0x02, + 'intf_num': 1001, + 'oid': '', + 'oid_len': man_addr.oid_len, + 'len': man_addr.len, + 'typelen': man_addr.typelen} + _man_addr_str = ','.join(['%s=%s' % (k, repr(man_addr_values[k])) + for k, v in inspect.getmembers(man_addr) + if k in man_addr_values]) + tlv_man_addr_str = '%s(%s)' % (lldp.ManagementAddress.__name__, + _man_addr_str) + + # OrganizationallySpecific string + org_spec_values = {'oui': '\x00\x12\x0f', + 'subtype': 0x02, + 'info': '\x07\x01\x00', + 'len': org_spec.len, + 'typelen': org_spec.typelen} + _org_spec_str = ','.join(['%s=%s' % (k, repr(org_spec_values[k])) + for k, v in inspect.getmembers(org_spec) + if k in org_spec_values]) + tlv_org_spec_str = '%s(%s)' % (lldp.OrganizationallySpecific.__name__, + _org_spec_str) + + # End string + end_values = {'len': end.len, + 'typelen': end.typelen} + _end_str = ','.join(['%s=%s' % (k, repr(end_values[k])) + for k, v in inspect.getmembers(end) + if k in end_values]) + tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str) + + # tlvs string + _tlvs_str = '(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_port_desc_str, + tlv_system_name_str, + tlv_sys_desc_str, + tlv_sys_cap_str, + tlv_man_addr_str, + tlv_org_spec_str, + tlv_end_str) + + # lldp string + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) diff --git a/ryu/tests/unit/packet/test_mpls.py b/ryu/tests/unit/packet/test_mpls.py new file mode 100644 index 0000000..abde030 --- /dev/null +++ b/ryu/tests/unit/packet/test_mpls.py @@ -0,0 +1,54 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import mpls + + +LOG = logging.getLogger(__name__) + + +class Test_mpls(unittest.TestCase): + + label = 29 + exp = 6 + bsb = 1 + ttl = 64 + mp = mpls.mpls(label, exp, bsb, ttl) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + mpls_values = {'label': self.label, + 'exp': self.exp, + 'bsb': self.bsb, + 'ttl': self.ttl} + _mpls_str = ','.join(['%s=%s' % (k, repr(mpls_values[k])) + for k, v in inspect.getmembers(self.mp) + if k in mpls_values]) + mpls_str = '%s(%s)' % (mpls.mpls.__name__, _mpls_str) + + eq_(str(self.mp), mpls_str) + eq_(repr(self.mp), mpls_str) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py index 45f27fd..d8ea0d4 100644 --- a/ryu/tests/unit/packet/test_packet.py +++ b/ryu/tests/unit/packet/test_packet.py @@ -19,6 +19,7 @@ import unittest import logging import struct import array +import inspect from nose.tools import * from nose.plugins.skip import Skip, SkipTest from ryu.ofproto import ether, inet @@ -116,6 +117,40 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_ARP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + arp_values = {'hwtype': 1, + 'proto': ether.ETH_TYPE_IP, + 'hlen': 6, + 'plen': 4, + 'opcode': 2, + 'src_mac': self.src_mac, + 'dst_mac': self.dst_mac, + 'src_ip': self.src_ip, + 'dst_ip': self.dst_ip} + _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k])) + for k, v in inspect.getmembers(p_arp) + if k in arp_values]) + arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str) + + pkt_str = '%s, %s' % (eth_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_vlan_arp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -185,6 +220,52 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_8021Q} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + vlan_values = {'pcp': 0b111, + 'cfi': 0b1, + 'vid': 3, + 'ethertype': ether.ETH_TYPE_ARP} + _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k])) + for k, v in inspect.getmembers(p_vlan) + if k in vlan_values]) + vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str) + + arp_values = {'hwtype': 1, + 'proto': ether.ETH_TYPE_IP, + 'hlen': 6, + 'plen': 4, + 'opcode': 2, + 'src_mac': self.src_mac, + 'dst_mac': self.dst_mac, + 'src_ip': self.src_ip, + 'dst_ip': self.dst_ip} + _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k])) + for k, v in inspect.getmembers(p_arp) + if k in arp_values]) + arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str) + + pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(vlan_str, str(p_vlan)) + eq_(vlan_str, repr(p_vlan)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_udp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -272,6 +353,57 @@ class TestPacket(unittest.TestCase): ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 1, + 'total_length': l, + 'identification': 3, + 'flags': 1, + 'offset': p_ipv4.offset, + 'ttl': 64, + 'proto': inet.IPPROTO_UDP, + 'csum': p_ipv4.csum, + 'src': self.src_ip, + 'dst': self.dst_ip, + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + udp_values = {'src_port': 0x190f, + 'dst_port': 0x1F90, + 'total_length': len(u_buf) + len(self.payload), + 'csum': 0x77b2} + _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k])) + for k, v in inspect.getmembers(p_udp) + if k in udp_values]) + udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(udp_str, str(p_udp)) + eq_(udp_str, repr(p_udp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_tcp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -371,6 +503,63 @@ class TestPacket(unittest.TestCase): ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 0, + 'total_length': l, + 'identification': 0, + 'flags': 0, + 'offset': p_ipv4.offset, + 'ttl': 64, + 'proto': inet.IPPROTO_TCP, + 'csum': p_ipv4.csum, + 'src': self.src_ip, + 'dst': self.dst_ip, + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + tcp_values = {'src_port': 0x190f, + 'dst_port': 0x1F90, + 'seq': 0x123, + 'ack': 1, + 'offset': 6, + 'bits': 0b101010, + 'window_size': 2048, + 'csum': p_tcp.csum, + 'urgent': 0x6f, + 'option': p_tcp.option} + _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k])) + for k, v in inspect.getmembers(p_tcp) + if k in tcp_values]) + tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(tcp_str, str(p_tcp)) + eq_(tcp_str, repr(p_tcp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_llc_bpdu(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -462,3 +651,64 @@ class TestPacket(unittest.TestCase): eq_(20, p_bpdu.max_age) eq_(2, p_bpdu.hello_time) eq_(15, p_bpdu.forward_delay) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IEEE802_3} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ctrl_values = {'modifier_function1': 0, + 'pf_bit': 0, + 'modifier_function2': 0} + _ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k])) + for k, v in inspect.getmembers(p_llc.control) + if k in ctrl_values]) + ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str) + + llc_values = {'dsap_addr': repr(llc.SAP_BDPU), + 'ssap_addr': repr(llc.SAP_BDPU), + 'control': ctrl_str} + _llc_str = ','.join(['%s=%s' % (k, llc_values[k]) + for k, v in inspect.getmembers(p_llc) + if k in llc_values]) + llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str) + + bpdu_values = {'protocol_id': bpdu.PROTOCOL_IDENTIFIER, + 'version_id': bpdu.PROTOCOLVERSION_ID_BPDU, + 'bpdu_type': bpdu.TYPE_CONFIG_BPDU, + 'flags': 0, + 'root_priority': long(32768), + 'root_system_id_extension': long(0), + 'root_mac_address': self.src_mac, + 'root_path_cost': 0, + 'bridge_priority': long(32768), + 'bridge_system_id_extension': long(0), + 'bridge_mac_address': self.dst_mac, + 'port_priority': 128, + 'port_number': 4, + 'message_age': float(1), + 'max_age': float(20), + 'hello_time': float(2), + 'forward_delay': float(15)} + _bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k])) + for k, v in inspect.getmembers(p_bpdu) + if k in bpdu_values]) + bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str) + + pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(llc_str, str(p_llc)) + eq_(llc_str, repr(p_llc)) + + eq_(bpdu_str, str(p_bpdu)) + eq_(bpdu_str, repr(p_bpdu)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) diff --git a/ryu/tests/unit/packet/test_vrrp.py b/ryu/tests/unit/packet/test_vrrp.py index 13b19ca..a26bc90 100644 --- a/ryu/tests/unit/packet/test_vrrp.py +++ b/ryu/tests/unit/packet/test_vrrp.py @@ -19,6 +19,7 @@ import unittest import logging import struct +import inspect from nose.tools import eq_, ok_ from nose.tools import raises @@ -183,6 +184,26 @@ class Test_vrrpv2(unittest.TestCase): max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + vrrpv2_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': self.vrrpv2.checksum, + 'ip_addresses': [self.ip_address], + 'auth_type': self.auth_type, + 'auth_data': self.auth_data, + 'identification': self.vrrpv2.identification} + _vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k])) + for k, v in inspect.getmembers(self.vrrpv2) + if k in vrrpv2_values]) + vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str) + + eq_(str(self.vrrpv2), vrrpv2_str) + eq_(repr(self.vrrpv2), vrrpv2_str) + class Test_vrrpv3_ipv4(unittest.TestCase): """ Test case for vrrp v3 IPv4 @@ -328,6 +349,26 @@ class Test_vrrpv3_ipv4(unittest.TestCase): max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + vrrpv3_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': self.vrrpv3.checksum, + 'ip_addresses': [self.ip_address], + 'auth_type': None, + 'auth_data': None, + 'identification': self.vrrpv3.identification} + _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k])) + for k, v in inspect.getmembers(self.vrrpv3) + if k in vrrpv3_values]) + vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) + class Test_vrrpv3_ipv6(unittest.TestCase): """ Test case for vrrp v3 IPv6 @@ -430,3 +471,23 @@ class Test_vrrpv3_ipv6(unittest.TestCase): print(len(p0.data), p0.data) print(len(p1.data), p1.data) eq_(p0.data, p1.data) + + def test_to_string(self): + vrrpv3_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': self.vrrpv3.checksum, + 'ip_addresses': [self.ip_address], + 'auth_type': None, + 'auth_data': None, + 'identification': self.vrrpv3.identification} + _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k])) + for k, v in inspect.getmembers(self.vrrpv3) + if k in vrrpv3_values]) + vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) -- 1.7.10.4 ------------------------------------------------------------------------------ Get your SQL database under version control now! Version control is standard for application code, but databases havent caught up. So what steps can you take to put your SQL databases under version control? Why should you start doing it? Read more to find out. http://pubads.g.doubleclick.net/gampad/clk?id=49501711&iu=/4140/ostg.clktrk _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
