Sorry about not responding timely. On Tue, 27 Aug 2013 09:53:19 +0900 Yuichi Ito <[email protected]> wrote:
> Changes v2 -> v3: > > - use datapath.ports instead of LacpLib.src_port_to_mac > - with the above, remove LacpLib.switch_features_handler > > > > this application provides the simple example of link aggregation using LACP. > > the module "lacplib" controls exchange of LACP packets and watches the status > of the slave i/fs. > the status changes if the i/fs went into a LAG or timeout to exchange LACP > occurred. > the module sends a "EventSlaveStateChanged" event when the status changed. > > the module "simple_switch_lacp" is a variation of "simple_switch". > when the module received "EventSlaveStateChanged" event, the module resets > flow entries. > > to run: > ryu-manager ryu/app/simple_switch_lacp.py > > Signed-off-by: Yuichi Ito <[email protected]> > --- > ryu/app/simple_switch_lacp.py | 145 ++++++++++++++++++++ > ryu/lib/lacplib.py | 292 > +++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 437 insertions(+) > create mode 100644 ryu/app/simple_switch_lacp.py > create mode 100644 ryu/lib/lacplib.py > > diff --git a/ryu/app/simple_switch_lacp.py b/ryu/app/simple_switch_lacp.py > new file mode 100644 > index 0000000..d70bba1 > --- /dev/null > +++ b/ryu/app/simple_switch_lacp.py > @@ -0,0 +1,145 @@ > +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. > +# > +# Licensed under the Apache License, Version 2.0 (the "License"); > +# you may not use this file except in compliance with the License. > +# You may obtain a copy of the License at > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > +# implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > + > +from ryu.base import app_manager > +from ryu.controller import ofp_event > +from ryu.controller.handler import MAIN_DISPATCHER > +from ryu.controller.handler import set_ev_cls > +from ryu.ofproto import ofproto_v1_0 > +from ryu.lib import addrconv > +from ryu.lib import lacplib > +from ryu.lib.dpid import str_to_dpid > +from ryu.lib.packet import ethernet > +from ryu.lib.packet import packet > +from ryu.lib.packet import slow > + > + > +class SimpleSwitchLacp(app_manager.RyuApp): > + OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION] > + _CONTEXTS = {'lacplib': lacplib.LacpLib} > + > + def __init__(self, *args, **kwargs): > + super(SimpleSwitchLacp, self).__init__(*args, **kwargs) > + self.mac_to_port = {} > + self._lacp = kwargs['lacplib'] > + # in this sample application, bonding i/fs of the switchs > + # shall be set up as follows: > + # - the port 1 and 2 of the datapath 1 face the slave i/fs. > + # - the port 3, 4 and 5 of the datapath 1 face the others. > + # - the port 1 and 2 of the datapath 2 face the others. > + self._lacp.add( > + dpid=str_to_dpid('0000000000000001'), ports=[1, 2]) > + self._lacp.add( > + dpid=str_to_dpid('0000000000000001'), ports=[3, 4, 5]) > + self._lacp.add( > + dpid=str_to_dpid('0000000000000002'), ports=[1, 2]) > + > + def add_flow(self, datapath, in_port, dst, actions): > + ofproto = datapath.ofproto > + > + wildcards = ofproto_v1_0.OFPFW_ALL > + wildcards &= ~ofproto_v1_0.OFPFW_IN_PORT > + wildcards &= ~ofproto_v1_0.OFPFW_DL_DST > + > + match = datapath.ofproto_parser.OFPMatch( > + wildcards, in_port, 0, addrconv.mac.text_to_bin(dst), > + 0, 0, 0, 0, 0, 0, 0, 0, 0) > + > + mod = datapath.ofproto_parser.OFPFlowMod( > + datapath=datapath, match=match, cookie=0, > + command=ofproto.OFPFC_ADD, idle_timeout=0, hard_timeout=0, > + priority=ofproto.OFP_DEFAULT_PRIORITY, actions=actions) > + datapath.send_msg(mod) > + > + def del_flow(self, datapath, dst): > + ofproto = datapath.ofproto > + parser = datapath.ofproto_parser > + > + match = parser.OFPMatch(dl_dst=addrconv.mac.text_to_bin(dst)) > + mod = parser.OFPFlowMod( > + datapath=datapath, match=match, cookie=0, > + command=ofproto.OFPFC_DELETE) > + datapath.send_msg(mod) > + > + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) > + def _packet_in_handler(self, ev): > + msg = ev.msg > + datapath = msg.datapath > + ofproto = datapath.ofproto > + > + pkt = packet.Packet(msg.data) > + if slow.lacp in pkt: > + # do nothing > + return I simply asked your intention about EvenPacketIn about v1. I guess that you tried to hide LACP from applications. It's fine by me. If you prefer that approach, please revert the code. Sorry that I was not clear enough. > + eth = pkt.get_protocols(ethernet.ethernet)[0] > + > + dst = eth.dst > + src = eth.src > + > + dpid = datapath.id > + self.mac_to_port.setdefault(dpid, {}) > + > + self.logger.info("packet in %s %s %s %s", > + dpid, src, dst, msg.in_port) > + > + # learn a mac address to avoid FLOOD next time. > + self.mac_to_port[dpid][src] = msg.in_port > + > + if dst in self.mac_to_port[dpid]: > + out_port = self.mac_to_port[dpid][dst] > + else: > + out_port = ofproto.OFPP_FLOOD > + > + actions = [datapath.ofproto_parser.OFPActionOutput(out_port)] > + > + # install a flow to avoid packet_in next time > + if out_port != ofproto.OFPP_FLOOD: > + self.add_flow(datapath, msg.in_port, dst, actions) > + > + out = datapath.ofproto_parser.OFPPacketOut( > + datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port, > + actions=actions) > + datapath.send_msg(out) > + > + @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) > + def _port_status_handler(self, ev): > + msg = ev.msg > + reason = msg.reason > + port_no = msg.desc.port_no > + > + ofproto = msg.datapath.ofproto > + if reason == ofproto.OFPPR_ADD: > + self.logger.info("port added %s", port_no) > + elif reason == ofproto.OFPPR_DELETE: > + self.logger.info("port deleted %s", port_no) > + elif reason == ofproto.OFPPR_MODIFY: > + self.logger.info("port modified %s", port_no) > + else: > + self.logger.info("Illeagal port state %s %s", port_no, reason) This handler is necessary? I know that the original simple_switch.py has this to show how to handle PortStatus but I guess that we don't need this in this code. > + @set_ev_cls(lacplib.EventSlaveStateChanged, lacplib.LAG_EV_DISPATCHER) > + def _slave_state_changed_handler(self, ev): > + datapath = ev.datapath > + dpid = datapath.id > + port_no = ev.port > + enabled = ev.enabled > + self.logger.info("slave state changed port: %d enabled: %s", > + port_no, enabled) > + if dpid in self.mac_to_port: > + for mac in self.mac_to_port[dpid]: > + self.del_flow(datapath, mac) > + del self.mac_to_port[dpid] > + self.mac_to_port.setdefault(dpid, {}) > diff --git a/ryu/lib/lacplib.py b/ryu/lib/lacplib.py > new file mode 100644 > index 0000000..0ec68ed > --- /dev/null > +++ b/ryu/lib/lacplib.py > @@ -0,0 +1,292 @@ > +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. > +# > +# Licensed under the Apache License, Version 2.0 (the "License"); > +# you may not use this file except in compliance with the License. > +# You may obtain a copy of the License at > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > +# implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > + > +import logging > + > +from ryu.base import app_manager > +from ryu.controller import event > +from ryu.controller import ofp_event > +from ryu.controller.handler import MAIN_DISPATCHER > +from ryu.controller.handler import set_ev_cls > +from ryu.ofproto import ether > +from ryu.lib import addrconv > +from ryu.lib.dpid import dpid_to_str > +from ryu.lib.packet import packet > +from ryu.lib.packet import ethernet > +from ryu.lib.packet import slow > + > + > +LAG_EV_DISPATCHER = "lacplib" > + > + > +class EventSlaveStateChanged(event.EventBase): > + def __init__(self, datapath, port, enabled): > + """initialization.""" > + super(EventSlaveStateChanged, self).__init__() > + self.datapath = datapath > + self.port = port > + self.enabled = enabled > + > + > +class LacpLib(app_manager.RyuApp): > + > + #------------------------------------------------------------------- > + # PUBLIC METHODS > + #------------------------------------------------------------------- > + def __init__(self): > + """initialization.""" > + super(LacpLib, self).__init__() > + self.name = 'lacplib' > + self.bonds = [] > + self.__set_logger() > + > + def add(self, dpid, ports): > + """add a setting of a bonding i/f. > + 'add' method takes the correspondig args in this order. > + > + ========= ===================================================== > + Attribute Description > + ========= ===================================================== > + dpid an integer value that means datapath id. > + > + ports a list of integer values that means the ports face > + with the slave i/fs. > + ========= ===================================================== > + > + if you want to use multi LAG, call 'add' method more than once. > + """ > + assert isinstance(ports, list) > + assert 2 <= len(ports) > + ifs = {} > + for port in ports: > + ifs[port] = {'enabled': False, 'timeout': 0} > + bond = {} > + bond[dpid] = ifs > + self.bonds.append(bond) > + > + #------------------------------------------------------------------- > + # PUBLIC METHODS ( EVENT HANDLERS ) > + #------------------------------------------------------------------- > + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) > + def packet_in_handler(self, evt): > + """PacketIn event handler. when the received packet was LACP, > + proceed it. otherwise, do nothing.""" > + req_pkt = packet.Packet(evt.msg.data) > + if slow.lacp in req_pkt: > + (req_lacp, ) = req_pkt.get_protocols(slow.lacp) > + (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet) > + self.__do_lacp(req_lacp, req_eth.src, evt.msg) > + else: > + # do nothing > + pass > + > + @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER) > + def flow_removed_handler(self, evt): > + """FlowRemoved event handler. when the removed flow entry was > + for LACP, set the status of the slave i/f to enabled, and > + send a event that LACP exchange timeout has occurred.""" > + msg = evt.msg > + datapath = msg.datapath > + dpid = datapath.id > + match = msg.match > + port = match.in_port > + if ether.ETH_TYPE_SLOW != match.dl_type: > + return > + self.logger.info( > + "SW=%s PORT=%d LACP exchange timeout has occurred.", > + dpid_to_str(dpid), port) > + self.__set_slave_enabled(dpid, port, False) > + self.__set_slave_timeout(dpid, port, 0) > + self.send_event_to_observers( > + EventSlaveStateChanged(datapath, port, False)) > + > + #------------------------------------------------------------------- > + # PRIVATE METHODS ( RELATED TO LACP ) > + #------------------------------------------------------------------- All the code in Ryu uses a single underscore for a private method? If so, I prefer to do so in this code too for consistency. > + def __do_lacp(self, req_lacp, src, msg): > + """packet-in process when the received packet is LACP.""" > + datapath = msg.datapath > + dpid = datapath.id > + port = msg.in_port > + ofproto = datapath.ofproto > + parser = datapath.ofproto_parser > + > + self.logger.info("SW=%s PORT=%d LACP received.", > + dpid_to_str(dpid), port) > + self.logger.debug(str(req_lacp)) > + > + # when LACP arrived at disabled port, update the status of > + # the slave i/f and reset all flow entries except for LACP. > + if not self.__get_slave_enabled(dpid, port): > + self.logger.info( > + "SW=%s PORT=%d the slave i/f has just been up.", > + dpid_to_str(dpid), port) > + self.__set_slave_enabled(dpid, port, True) > + self.send_event_to_observers( > + EventSlaveStateChanged(datapath, port, True)) > + > + # set the idle_timeout time using the actor state of the > + # received packet. > + if req_lacp.LACP_STATE_SHORT_TIMEOUT == \ > + req_lacp.actor_state_timeout: > + idle_timeout = req_lacp.SHORT_TIMEOUT_TIME > + else: > + idle_timeout = req_lacp.LONG_TIMEOUT_TIME > + > + # when the timeout time has changed, update the timeout time of > + # the slave i/f and re-enter a flow entry for the packet from > + # the slave i/f with idle_timeout. > + if idle_timeout != self.__get_slave_timeout(dpid, port): > + self.logger.info( > + "SW=%s PORT=%d the timeout time has changed.", > + dpid_to_str(dpid), port) > + self.__set_slave_timeout(dpid, port, idle_timeout) > + self.__set_flow_entry_packet_in(src, port, idle_timeout, > + msg) > + > + # create a response packet. > + res_pkt = self.__create_response(datapath, port, req_lacp) > + > + # packet-out the response packet. > + out_port = ofproto.OFPP_IN_PORT > + actions = [parser.OFPActionOutput(out_port)] > + out = datapath.ofproto_parser.OFPPacketOut( > + datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER, > + data=res_pkt.data, in_port=port, actions=actions) > + datapath.send_msg(out) > + > + def __create_response(self, datapath, port, req): > + """create a packet including LACP.""" > + src = datapath.ports[port].hw_addr > + res_ether = ethernet.ethernet( > + slow.SLOW_PROTOCOL_MULTICAST, src, ether.ETH_TYPE_SLOW) > + res_lacp = self.__create_lacp(datapath, port, req) > + res_pkt = packet.Packet() > + res_pkt.add_protocol(res_ether) > + res_pkt.add_protocol(res_lacp) > + res_pkt.serialize() > + return res_pkt > + > + def __create_lacp(self, datapath, port, req): > + """create a LACP packet.""" > + actor_system = datapath.ports[datapath.ofproto.OFPP_LOCAL].hw_addr > + res = slow.lacp( > + actor_system_priority=0xffff, > + actor_system=actor_system, > + actor_key=req.actor_key, > + actor_port_priority=0xff, > + actor_port=port, > + actor_state_activity=req.LACP_STATE_PASSIVE, > + actor_state_timeout=req.actor_state_timeout, > + actor_state_aggregation=req.actor_state_aggregation, > + actor_state_synchronization=req.actor_state_synchronization, > + actor_state_collecting=req.actor_state_collecting, > + actor_state_distributing=req.actor_state_distributing, > + actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER, > + actor_state_expired=req.LACP_STATE_NOT_EXPIRED, > + partner_system_priority=req.actor_system_priority, > + partner_system=req.actor_system, > + partner_key=req.actor_key, > + partner_port_priority=req.actor_port_priority, > + partner_port=req.actor_port, > + partner_state_activity=req.actor_state_activity, > + partner_state_timeout=req.actor_state_timeout, > + partner_state_aggregation=req.actor_state_aggregation, > + partner_state_synchronization=req.actor_state_synchronization, > + partner_state_collecting=req.actor_state_collecting, > + partner_state_distributing=req.actor_state_distributing, > + partner_state_defaulted=req.actor_state_defaulted, > + partner_state_expired=req.actor_state_expired, > + collector_max_delay=0) > + self.logger.info("SW=%s PORT=%d LACP sent.", > + dpid_to_str(datapath.id), port) > + self.logger.debug(str(res)) > + return res > + > + def __get_slave_enabled(self, dpid, port): > + """get whether a slave i/f at some port of some datapath is > + enable or not.""" > + slave = self.__get_slave(dpid, port) > + if slave: > + return slave['enabled'] > + else: > + return False > + > + def __set_slave_enabled(self, dpid, port, enabled): > + """set whether a slave i/f at some port of some datapath is > + enable or not.""" > + slave = self.__get_slave(dpid, port) > + if slave: > + slave['enabled'] = enabled > + > + def __get_slave_timeout(self, dpid, port): > + """get the timeout time at some port of some datapath.""" > + slave = self.__get_slave(dpid, port) > + if slave: > + return slave['timeout'] > + else: > + return 0 > + > + def __set_slave_timeout(self, dpid, port, timeout): > + """set the timeout time at some port of some datapath.""" > + slave = self.__get_slave(dpid, port) > + if slave: > + slave['timeout'] = timeout > + > + def __get_slave(self, dpid, port): > + """get slave i/f at some port of some datapath.""" > + result = None > + for bond in self.bonds: > + if dpid in bond: > + if port in bond[dpid]: > + result = bond[dpid][port] > + break > + return result > + > + #------------------------------------------------------------------- > + # PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL ) > + #------------------------------------------------------------------- > + def __set_flow_entry_packet_in(self, src, port, timeout, msg): > + """enter a flow entry for the packet from the slave i/f > + with idle_timeout.""" > + datapath = msg.datapath > + ofproto = datapath.ofproto > + parser = datapath.ofproto_parser > + > + wildcards = ofproto.OFPFW_ALL > + wildcards &= ~ofproto.OFPFW_IN_PORT > + wildcards &= ~ofproto.OFPFW_DL_SRC > + wildcards &= ~ofproto.OFPFW_DL_TYPE > + match = parser.OFPMatch(wildcards=wildcards, in_port=port, > + dl_src=addrconv.mac.text_to_bin(src), > + dl_type=ether.ETH_TYPE_SLOW) > + actions = [parser.OFPActionOutput( > + ofproto.OFPP_CONTROLLER, ofproto.OFP_MSG_SIZE_MAX)] > + mod = parser.OFPFlowMod( > + datapath=datapath, match=match, cookie=0, > + command=ofproto.OFPFC_ADD, idle_timeout=timeout, > + flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions) > + datapath.send_msg(mod) > + > + #------------------------------------------------------------------- > + # PRIVATE METHODS ( OTHERS ) > + #------------------------------------------------------------------- > + def __set_logger(self): > + """change log format.""" > + self.logger.propagate = False > + hdl = logging.StreamHandler() > + fmt_str = '[LACP][%(levelname)s] %(message)s' > + hdl.setFormatter(logging.Formatter(fmt_str)) > + self.logger.addHandler(hdl) > -- > 1.7.10.4 > > > ------------------------------------------------------------------------------ > Introducing Performance Central, a new site from SourceForge and > AppDynamics. Performance Central is your source for news, insights, > analysis and resources for efficient Application Performance Management. > Visit us today! > http://pubads.g.doubleclick.net/gampad/clk?id=48897511&iu=/4140/ostg.clktrk > _______________________________________________ > Ryu-devel mailing list > [email protected] > https://lists.sourceforge.net/lists/listinfo/ryu-devel ------------------------------------------------------------------------------ Introducing Performance Central, a new site from SourceForge and AppDynamics. Performance Central is your source for news, insights, analysis and resources for efficient Application Performance Management. Visit us today! http://pubads.g.doubleclick.net/gampad/clk?id=48897511&iu=/4140/ostg.clktrk _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
