Changes v1 -> v2: - move "lacplib" to ryu.lib. - change the process of PacketIn. - add more samples of the configuration.
this application provides the simple example of link aggregation using LACP. the module "lacplib" controls exchange of LACP packets and watches the status of the slave i/fs. the status changes if the i/fs went into a LAG or timeout to exchange LACP occurred. the module sends a "EventSlaveStateChanged" event when the status changed. the module "simple_switch_lacp" is a variation of "simple_switch". when the module received "EventSlaveStateChanged" event, the module resets flow entries. to run: ryu-manager ryu/app/simple_switch_lacp.py Signed-off-by: Yuichi Ito <[email protected]> --- ryu/app/simple_switch_lacp.py | 148 ++++++++++++++++++++ ryu/lib/lacplib.py | 310 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 458 insertions(+) create mode 100644 ryu/app/simple_switch_lacp.py create mode 100644 ryu/lib/lacplib.py diff --git a/ryu/app/simple_switch_lacp.py b/ryu/app/simple_switch_lacp.py new file mode 100644 index 0000000..f8ee5ec --- /dev/null +++ b/ryu/app/simple_switch_lacp.py @@ -0,0 +1,148 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct + +from ryu.base import app_manager +from ryu.controller import ofp_event +from ryu.controller.handler import MAIN_DISPATCHER +from ryu.controller.handler import set_ev_cls +from ryu.ofproto import ofproto_v1_0 +from ryu.lib import lacplib +from ryu.lib.dpid import str_to_dpid +from ryu.lib.mac import haddr_to_str +from ryu.lib.packet import packet +from ryu.lib.packet import slow + + +class SimpleSwitchLacp(app_manager.RyuApp): + OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION] + _CONTEXTS = {'lacplib': lacplib.LacpLib} + + def __init__(self, *args, **kwargs): + super(SimpleSwitchLacp, self).__init__(*args, **kwargs) + self.mac_to_port = {} + self._lacp = kwargs['lacplib'] + # in this sample application, bonding i/fs of the switchs + # shall be set up as follows: + # - the port 1 and 2 of the datapath 1 face the slave i/fs. + # - the port 3, 4 and 5 of the datapath 1 face the others. + # - the port 1 and 2 of the datapath 2 face the others. + self._lacp.add( + dpid=str_to_dpid('0000000000000001'), ports=[1, 2]) + self._lacp.add( + dpid=str_to_dpid('0000000000000001'), ports=[3, 4, 5]) + self._lacp.add( + dpid=str_to_dpid('0000000000000002'), ports=[1, 2]) + + def add_flow(self, datapath, in_port, dst, actions): + ofproto = datapath.ofproto + + wildcards = ofproto_v1_0.OFPFW_ALL + wildcards &= ~ofproto_v1_0.OFPFW_IN_PORT + wildcards &= ~ofproto_v1_0.OFPFW_DL_DST + + match = datapath.ofproto_parser.OFPMatch( + wildcards, in_port, 0, dst, + 0, 0, 0, 0, 0, 0, 0, 0, 0) + + mod = datapath.ofproto_parser.OFPFlowMod( + datapath=datapath, match=match, cookie=0, + command=ofproto.OFPFC_ADD, idle_timeout=0, hard_timeout=0, + priority=ofproto.OFP_DEFAULT_PRIORITY, actions=actions) + datapath.send_msg(mod) + + def del_flow(self, datapath, dst): + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + wildcards = ofproto.OFPFW_ALL + wildcards &= ~ofproto.OFPFW_DL_DST + + match = datapath.ofproto_parser.OFPMatch( + wildcards=wildcards, dl_dst=dst) + mod = parser.OFPFlowMod( + datapath=datapath, match=match, cookie=0, + command=ofproto.OFPFC_DELETE) + datapath.send_msg(mod) + + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def _packet_in_handler(self, ev): + msg = ev.msg + datapath = msg.datapath + ofproto = datapath.ofproto + + req_pkt = packet.Packet(msg.data) + if slow.lacp in req_pkt: + # do nothing + return + + dst, src, _eth_type = struct.unpack_from('!6s6sH', buffer(msg.data), 0) + + dpid = datapath.id + self.mac_to_port.setdefault(dpid, {}) + + self.logger.info("packet in %s %s %s %s", + dpid, haddr_to_str(src), haddr_to_str(dst), + msg.in_port) + + # learn a mac address to avoid FLOOD next time. + self.mac_to_port[dpid][src] = msg.in_port + + if dst in self.mac_to_port[dpid]: + out_port = self.mac_to_port[dpid][dst] + else: + out_port = ofproto.OFPP_FLOOD + + actions = [datapath.ofproto_parser.OFPActionOutput(out_port)] + + # install a flow to avoid packet_in next time + if out_port != ofproto.OFPP_FLOOD: + self.add_flow(datapath, msg.in_port, dst, actions) + + out = datapath.ofproto_parser.OFPPacketOut( + datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port, + actions=actions) + datapath.send_msg(out) + + @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) + def _port_status_handler(self, ev): + msg = ev.msg + reason = msg.reason + port_no = msg.desc.port_no + + ofproto = msg.datapath.ofproto + if reason == ofproto.OFPPR_ADD: + self.logger.info("port added %s", port_no) + elif reason == ofproto.OFPPR_DELETE: + self.logger.info("port deleted %s", port_no) + elif reason == ofproto.OFPPR_MODIFY: + self.logger.info("port modified %s", port_no) + else: + self.logger.info("Illeagal port state %s %s", port_no, reason) + + @set_ev_cls(lacplib.EventSlaveStateChanged, lacplib.LAG_EV_DISPATCHER) + def _slave_state_changed_handler(self, ev): + datapath = ev.datapath + dpid = datapath.id + port_no = ev.port + enabled = ev.enabled + self.logger.info("slave state changed port: %d enabled: %s", + port_no, enabled) + if dpid in self.mac_to_port: + for mac in self.mac_to_port[dpid]: + self.del_flow(datapath, mac) + del self.mac_to_port[dpid] + self.mac_to_port.setdefault(dpid, {}) diff --git a/ryu/lib/lacplib.py b/ryu/lib/lacplib.py new file mode 100644 index 0000000..2e2c345 --- /dev/null +++ b/ryu/lib/lacplib.py @@ -0,0 +1,310 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from ryu.base import app_manager +from ryu.controller import event +from ryu.controller import ofp_event +from ryu.controller.handler import CONFIG_DISPATCHER +from ryu.controller.handler import MAIN_DISPATCHER +from ryu.controller.handler import set_ev_cls +from ryu.ofproto import ether +from ryu.ofproto import ofproto_v1_0 +from ryu.lib import addrconv +from ryu.lib.dpid import dpid_to_str +from ryu.lib.packet import packet +from ryu.lib.packet import ethernet +from ryu.lib.packet import slow + + +LAG_EV_DISPATCHER = "lacplib" + + +class EventSlaveStateChanged(event.EventBase): + def __init__(self, datapath, port, enabled): + """initialization.""" + super(EventSlaveStateChanged, self).__init__() + self.datapath = datapath + self.port = port + self.enabled = enabled + + +class LacpLib(app_manager.RyuApp): + + #------------------------------------------------------------------- + # PUBLIC METHODS + #------------------------------------------------------------------- + def __init__(self): + """initialization.""" + super(LacpLib, self).__init__() + self.name = 'lacplib' + self.src_port_to_mac = {} + self.bonds = [] + self.__set_logger() + + def add(self, dpid, ports): + """add a setting of a bonding i/f. + 'add' method takes the correspondig args in this order. + + ========= ===================================================== + Attribute Description + ========= ===================================================== + dpid an integer value that means datapath id. + + ports a list of integer values that means the ports face + with the slave i/fs. + ========= ===================================================== + + if you want to use multi LAG, call 'add' method more than once. + """ + assert isinstance(ports, list) + assert 2 <= len(ports) + ifs = {} + for port in ports: + ifs[port] = {'enabled': False, 'timeout': 0} + bond = {} + bond[dpid] = ifs + self.bonds.append(bond) + + #------------------------------------------------------------------- + # PUBLIC METHODS ( EVENT HANDLERS ) + #------------------------------------------------------------------- + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def packet_in_handler(self, ev): + """PacketIn event handler. when the received packet was LACP, + proceed it. otherwise, do nothing.""" + req_pkt = packet.Packet(ev.msg.data) + if slow.lacp in req_pkt: + (req_lacp, ) = req_pkt.get_protocols(slow.lacp) + (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet) + self.__do_lacp(req_lacp, req_eth.src, ev.msg) + else: + # do nothing + pass + + @set_ev_cls(ofp_event.EventOFPSwitchFeatures, + [CONFIG_DISPATCHER, MAIN_DISPATCHER]) + def switch_features_handler(self, ev): + """FeaturesReply event handler. the controller will learn + the MAC addresses of the ports of the switches. these addresses + are used to send a LACP response.""" + msg = ev.msg + datapath = msg.datapath + dpid = datapath.id + self.src_port_to_mac.setdefault(dpid, {}) + for port_no, port in msg.ports.items(): + self.src_port_to_mac[dpid][port_no] = port.hw_addr + + @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER) + def flow_removed_handler(self, ev): + """FlowRemoved event handler. when the removed flow entry was + for LACP, set the status of the slave i/f to enabled, and + send a event that LACP exchange timeout has occurred.""" + msg = ev.msg + datapath = msg.datapath + dpid = datapath.id + match = msg.match + port = match.in_port + if ether.ETH_TYPE_SLOW != match.dl_type: + return + self.logger.info( + "SW=%s PORT=%d LACP exchange timeout has occurred.", + dpid_to_str(dpid), port) + self.__set_slave_enabled(dpid, port, False) + self.__set_slave_timeout(dpid, port, 0) + self.send_event_to_observers( + EventSlaveStateChanged(datapath, port, False)) + + #------------------------------------------------------------------- + # PRIVATE METHODS ( RELATED TO LACP ) + #------------------------------------------------------------------- + def __do_lacp(self, req_lacp, src, msg): + """packet-in process when the received packet is LACP.""" + datapath = msg.datapath + dpid = datapath.id + port = msg.in_port + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + self.logger.info("SW=%s PORT=%d LACP received.", + dpid_to_str(dpid), port) + self.logger.debug(str(req_lacp)) + + # when LACP arrived at disabled port, update the status of + # the slave i/f and reset all flow entries except for LACP. + if not self.__get_slave_enabled(dpid, port): + self.logger.info( + "SW=%s PORT=%d the slave i/f has just been up.", + dpid_to_str(dpid), port) + self.__set_slave_enabled(dpid, port, True) + self.send_event_to_observers( + EventSlaveStateChanged(datapath, port, True)) + + # set the idle_timeout time using the actor state of the + # received packet. + if req_lacp.LACP_STATE_SHORT_TIMEOUT == \ + req_lacp.actor_state_timeout: + idle_timeout = req_lacp.SHORT_TIMEOUT_TIME + else: + idle_timeout = req_lacp.LONG_TIMEOUT_TIME + + # when the timeout time has changed, update the timeout time of + # the slave i/f and re-enter a flow entry for the packet from + # the slave i/f with idle_timeout. + if idle_timeout != self.__get_slave_timeout(dpid, port): + self.logger.info( + "SW=%s PORT=%d the timeout time has changed.", + dpid_to_str(dpid), port) + self.__set_slave_timeout(dpid, port, idle_timeout) + self.__set_flow_entry_packet_in(src, port, idle_timeout, + msg) + + # create a response packet. + res_pkt = self.__create_response(dpid, port, req_lacp) + + # packet-out the response packet. + out_port = ofproto.OFPP_IN_PORT + actions = [parser.OFPActionOutput(out_port)] + out = datapath.ofproto_parser.OFPPacketOut( + datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER, + data=res_pkt.data, in_port=port, actions=actions) + datapath.send_msg(out) + + def __create_response(self, dpid, port, req): + """create a packet including LACP.""" + src = self.src_port_to_mac[dpid][port] + res_ether = ethernet.ethernet( + slow.SLOW_PROTOCOL_MULTICAST, + addrconv.mac.bin_to_text(src), ether.ETH_TYPE_SLOW) + res_lacp = self.__create_lacp(dpid, port, req) + res_pkt = packet.Packet() + res_pkt.add_protocol(res_ether) + res_pkt.add_protocol(res_lacp) + res_pkt.serialize() + return res_pkt + + def __create_lacp(self, dpid, port, req): + """create a LACP packet.""" + actor_system = addrconv.mac.bin_to_text( + self.src_port_to_mac[dpid][ofproto_v1_0.OFPP_LOCAL]) + res = slow.lacp( + actor_system_priority=0xffff, + actor_system=actor_system, + actor_key=req.actor_key, + actor_port_priority=0xff, + actor_port=port, + actor_state_activity=req.LACP_STATE_PASSIVE, + actor_state_timeout=req.actor_state_timeout, + actor_state_aggregation=req.actor_state_aggregation, + actor_state_synchronization=req.actor_state_synchronization, + actor_state_collecting=req.actor_state_collecting, + actor_state_distributing=req.actor_state_distributing, + actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER, + actor_state_expired=req.LACP_STATE_NOT_EXPIRED, + partner_system_priority=req.actor_system_priority, + partner_system=req.actor_system, + partner_key=req.actor_key, + partner_port_priority=req.actor_port_priority, + partner_port=req.actor_port, + partner_state_activity=req.actor_state_activity, + partner_state_timeout=req.actor_state_timeout, + partner_state_aggregation=req.actor_state_aggregation, + partner_state_synchronization=req.actor_state_synchronization, + partner_state_collecting=req.actor_state_collecting, + partner_state_distributing=req.actor_state_distributing, + partner_state_defaulted=req.actor_state_defaulted, + partner_state_expired=req.actor_state_expired, + collector_max_delay=0) + self.logger.info("SW=%s PORT=%d LACP sent.", dpid_to_str(dpid), + port) + self.logger.debug(str(res)) + return res + + def __get_slave_enabled(self, dpid, port): + """get whether a slave i/f at some port of some datapath is + enable or not.""" + slave = self.__get_slave(dpid, port) + if slave: + return slave['enabled'] + else: + return False + + def __set_slave_enabled(self, dpid, port, enabled): + """set whether a slave i/f at some port of some datapath is + enable or not.""" + slave = self.__get_slave(dpid, port) + if slave: + slave['enabled'] = enabled + + def __get_slave_timeout(self, dpid, port): + """get the timeout time at some port of some datapath.""" + slave = self.__get_slave(dpid, port) + if slave: + return slave['timeout'] + else: + return 0 + + def __set_slave_timeout(self, dpid, port, timeout): + """set the timeout time at some port of some datapath.""" + slave = self.__get_slave(dpid, port) + if slave: + slave['timeout'] = timeout + + def __get_slave(self, dpid, port): + """get slave i/f at some port of some datapath.""" + result = None + for bond in self.bonds: + if dpid in bond: + if port in bond[dpid]: + result = bond[dpid][port] + break + return result + + #------------------------------------------------------------------- + # PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL ) + #------------------------------------------------------------------- + def __set_flow_entry_packet_in(self, src, port, timeout, msg): + """enter a flow entry for the packet from the slave i/f + with idle_timeout.""" + datapath = msg.datapath + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + wildcards = ofproto.OFPFW_ALL + wildcards &= ~ofproto.OFPFW_IN_PORT + wildcards &= ~ofproto.OFPFW_DL_SRC + wildcards &= ~ofproto.OFPFW_DL_TYPE + match = parser.OFPMatch(wildcards=wildcards, in_port=port, + dl_src=addrconv.mac.text_to_bin(src), + dl_type=ether.ETH_TYPE_SLOW) + actions = [parser.OFPActionOutput( + ofproto.OFPP_CONTROLLER, ofproto.OFP_MSG_SIZE_MAX)] + mod = parser.OFPFlowMod( + datapath=datapath, match=match, cookie=0, + command=ofproto.OFPFC_ADD, idle_timeout=timeout, + flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions) + datapath.send_msg(mod) + + #------------------------------------------------------------------- + # PRIVATE METHODS ( OTHERS ) + #------------------------------------------------------------------- + def __set_logger(self): + """change log format.""" + self.logger.propagate = False + hdl = logging.StreamHandler() + fmt_str = '[LACP][%(levelname)s] %(message)s' + hdl.setFormatter(logging.Formatter(fmt_str)) + self.logger.addHandler(hdl) -- 1.7.10.4 ------------------------------------------------------------------------------ Introducing Performance Central, a new site from SourceForge and AppDynamics. Performance Central is your source for news, insights, analysis and resources for efficient Application Performance Management. Visit us today! http://pubads.g.doubleclick.net/gampad/clk?id=48897511&iu=/4140/ostg.clktrk _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
