Changes v2 -> v3:

- use datapath.ports instead of LacpLib.src_port_to_mac
- with the above, remove LacpLib.switch_features_handler



this application provides the simple example of link aggregation using LACP.

the module "lacplib" controls exchange of LACP packets and watches the status 
of the slave i/fs.
the status changes if the i/fs went into a LAG or timeout to exchange LACP 
occurred.
the module sends a "EventSlaveStateChanged" event when the status changed.

the module "simple_switch_lacp" is a variation of "simple_switch".
when the module received "EventSlaveStateChanged" event, the module resets flow 
entries.

to run:
ryu-manager ryu/app/simple_switch_lacp.py

Signed-off-by: Yuichi Ito <[email protected]>
---
 ryu/app/simple_switch_lacp.py |  145 ++++++++++++++++++++
 ryu/lib/lacplib.py            |  292 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 437 insertions(+)
 create mode 100644 ryu/app/simple_switch_lacp.py
 create mode 100644 ryu/lib/lacplib.py

diff --git a/ryu/app/simple_switch_lacp.py b/ryu/app/simple_switch_lacp.py
new file mode 100644
index 0000000..d70bba1
--- /dev/null
+++ b/ryu/app/simple_switch_lacp.py
@@ -0,0 +1,145 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ryu.base import app_manager
+from ryu.controller import ofp_event
+from ryu.controller.handler import MAIN_DISPATCHER
+from ryu.controller.handler import set_ev_cls
+from ryu.ofproto import ofproto_v1_0
+from ryu.lib import addrconv
+from ryu.lib import lacplib
+from ryu.lib.dpid import str_to_dpid
+from ryu.lib.packet import ethernet
+from ryu.lib.packet import packet
+from ryu.lib.packet import slow
+
+
+class SimpleSwitchLacp(app_manager.RyuApp):
+    OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
+    _CONTEXTS = {'lacplib': lacplib.LacpLib}
+
+    def __init__(self, *args, **kwargs):
+        super(SimpleSwitchLacp, self).__init__(*args, **kwargs)
+        self.mac_to_port = {}
+        self._lacp = kwargs['lacplib']
+        # in this sample application, bonding i/fs of the switchs
+        # shall be set up as follows:
+        # - the port 1 and 2 of the datapath 1 face the slave i/fs.
+        # - the port 3, 4 and 5 of the datapath 1 face the others.
+        # - the port 1 and 2 of the datapath 2 face the others.
+        self._lacp.add(
+            dpid=str_to_dpid('0000000000000001'), ports=[1, 2])
+        self._lacp.add(
+            dpid=str_to_dpid('0000000000000001'), ports=[3, 4, 5])
+        self._lacp.add(
+            dpid=str_to_dpid('0000000000000002'), ports=[1, 2])
+
+    def add_flow(self, datapath, in_port, dst, actions):
+        ofproto = datapath.ofproto
+
+        wildcards = ofproto_v1_0.OFPFW_ALL
+        wildcards &= ~ofproto_v1_0.OFPFW_IN_PORT
+        wildcards &= ~ofproto_v1_0.OFPFW_DL_DST
+
+        match = datapath.ofproto_parser.OFPMatch(
+            wildcards, in_port, 0, addrconv.mac.text_to_bin(dst),
+            0, 0, 0, 0, 0, 0, 0, 0, 0)
+
+        mod = datapath.ofproto_parser.OFPFlowMod(
+            datapath=datapath, match=match, cookie=0,
+            command=ofproto.OFPFC_ADD, idle_timeout=0, hard_timeout=0,
+            priority=ofproto.OFP_DEFAULT_PRIORITY, actions=actions)
+        datapath.send_msg(mod)
+
+    def del_flow(self, datapath, dst):
+        ofproto = datapath.ofproto
+        parser = datapath.ofproto_parser
+
+        match = parser.OFPMatch(dl_dst=addrconv.mac.text_to_bin(dst))
+        mod = parser.OFPFlowMod(
+            datapath=datapath, match=match, cookie=0,
+            command=ofproto.OFPFC_DELETE)
+        datapath.send_msg(mod)
+
+    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
+    def _packet_in_handler(self, ev):
+        msg = ev.msg
+        datapath = msg.datapath
+        ofproto = datapath.ofproto
+
+        pkt = packet.Packet(msg.data)
+        if slow.lacp in pkt:
+            # do nothing
+            return
+
+        eth = pkt.get_protocols(ethernet.ethernet)[0]
+
+        dst = eth.dst
+        src = eth.src
+
+        dpid = datapath.id
+        self.mac_to_port.setdefault(dpid, {})
+
+        self.logger.info("packet in %s %s %s %s",
+                         dpid, src, dst, msg.in_port)
+
+        # learn a mac address to avoid FLOOD next time.
+        self.mac_to_port[dpid][src] = msg.in_port
+
+        if dst in self.mac_to_port[dpid]:
+            out_port = self.mac_to_port[dpid][dst]
+        else:
+            out_port = ofproto.OFPP_FLOOD
+
+        actions = [datapath.ofproto_parser.OFPActionOutput(out_port)]
+
+        # install a flow to avoid packet_in next time
+        if out_port != ofproto.OFPP_FLOOD:
+            self.add_flow(datapath, msg.in_port, dst, actions)
+
+        out = datapath.ofproto_parser.OFPPacketOut(
+            datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
+            actions=actions)
+        datapath.send_msg(out)
+
+    @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
+    def _port_status_handler(self, ev):
+        msg = ev.msg
+        reason = msg.reason
+        port_no = msg.desc.port_no
+
+        ofproto = msg.datapath.ofproto
+        if reason == ofproto.OFPPR_ADD:
+            self.logger.info("port added %s", port_no)
+        elif reason == ofproto.OFPPR_DELETE:
+            self.logger.info("port deleted %s", port_no)
+        elif reason == ofproto.OFPPR_MODIFY:
+            self.logger.info("port modified %s", port_no)
+        else:
+            self.logger.info("Illeagal port state %s %s", port_no, reason)
+
+    @set_ev_cls(lacplib.EventSlaveStateChanged, lacplib.LAG_EV_DISPATCHER)
+    def _slave_state_changed_handler(self, ev):
+        datapath = ev.datapath
+        dpid = datapath.id
+        port_no = ev.port
+        enabled = ev.enabled
+        self.logger.info("slave state changed port: %d enabled: %s",
+                         port_no, enabled)
+        if dpid in self.mac_to_port:
+            for mac in self.mac_to_port[dpid]:
+                self.del_flow(datapath, mac)
+            del self.mac_to_port[dpid]
+        self.mac_to_port.setdefault(dpid, {})
diff --git a/ryu/lib/lacplib.py b/ryu/lib/lacplib.py
new file mode 100644
index 0000000..0ec68ed
--- /dev/null
+++ b/ryu/lib/lacplib.py
@@ -0,0 +1,292 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from ryu.base import app_manager
+from ryu.controller import event
+from ryu.controller import ofp_event
+from ryu.controller.handler import MAIN_DISPATCHER
+from ryu.controller.handler import set_ev_cls
+from ryu.ofproto import ether
+from ryu.lib import addrconv
+from ryu.lib.dpid import dpid_to_str
+from ryu.lib.packet import packet
+from ryu.lib.packet import ethernet
+from ryu.lib.packet import slow
+
+
+LAG_EV_DISPATCHER = "lacplib"
+
+
+class EventSlaveStateChanged(event.EventBase):
+    def __init__(self, datapath, port, enabled):
+        """initialization."""
+        super(EventSlaveStateChanged, self).__init__()
+        self.datapath = datapath
+        self.port = port
+        self.enabled = enabled
+
+
+class LacpLib(app_manager.RyuApp):
+
+    #-------------------------------------------------------------------
+    # PUBLIC METHODS
+    #-------------------------------------------------------------------
+    def __init__(self):
+        """initialization."""
+        super(LacpLib, self).__init__()
+        self.name = 'lacplib'
+        self.bonds = []
+        self.__set_logger()
+
+    def add(self, dpid, ports):
+        """add a setting of a bonding i/f.
+        'add' method takes the correspondig args in this order.
+
+        ========= =====================================================
+        Attribute Description
+        ========= =====================================================
+        dpid      an integer value that means datapath id.
+
+        ports     a list of integer values that means the ports face
+                  with the slave i/fs.
+        ========= =====================================================
+
+        if you want to use multi LAG, call 'add' method more than once.
+        """
+        assert isinstance(ports, list)
+        assert 2 <= len(ports)
+        ifs = {}
+        for port in ports:
+            ifs[port] = {'enabled': False, 'timeout': 0}
+        bond = {}
+        bond[dpid] = ifs
+        self.bonds.append(bond)
+
+    #-------------------------------------------------------------------
+    # PUBLIC METHODS ( EVENT HANDLERS )
+    #-------------------------------------------------------------------
+    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
+    def packet_in_handler(self, evt):
+        """PacketIn event handler. when the received packet was LACP,
+        proceed it. otherwise, do nothing."""
+        req_pkt = packet.Packet(evt.msg.data)
+        if slow.lacp in req_pkt:
+            (req_lacp, ) = req_pkt.get_protocols(slow.lacp)
+            (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
+            self.__do_lacp(req_lacp, req_eth.src, evt.msg)
+        else:
+            # do nothing
+            pass
+
+    @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)
+    def flow_removed_handler(self, evt):
+        """FlowRemoved event handler. when the removed flow entry was
+        for LACP, set the status of the slave i/f to enabled, and
+        send a event that LACP exchange timeout has occurred."""
+        msg = evt.msg
+        datapath = msg.datapath
+        dpid = datapath.id
+        match = msg.match
+        port = match.in_port
+        if ether.ETH_TYPE_SLOW != match.dl_type:
+            return
+        self.logger.info(
+            "SW=%s PORT=%d LACP exchange timeout has occurred.",
+            dpid_to_str(dpid), port)
+        self.__set_slave_enabled(dpid, port, False)
+        self.__set_slave_timeout(dpid, port, 0)
+        self.send_event_to_observers(
+            EventSlaveStateChanged(datapath, port, False))
+
+    #-------------------------------------------------------------------
+    # PRIVATE METHODS ( RELATED TO LACP )
+    #-------------------------------------------------------------------
+    def __do_lacp(self, req_lacp, src, msg):
+        """packet-in process when the received packet is LACP."""
+        datapath = msg.datapath
+        dpid = datapath.id
+        port = msg.in_port
+        ofproto = datapath.ofproto
+        parser = datapath.ofproto_parser
+
+        self.logger.info("SW=%s PORT=%d LACP received.",
+                         dpid_to_str(dpid), port)
+        self.logger.debug(str(req_lacp))
+
+        # when LACP arrived at disabled port, update the status of
+        # the slave i/f and reset all flow entries except for LACP.
+        if not self.__get_slave_enabled(dpid, port):
+            self.logger.info(
+                "SW=%s PORT=%d the slave i/f has just been up.",
+                dpid_to_str(dpid), port)
+            self.__set_slave_enabled(dpid, port, True)
+            self.send_event_to_observers(
+                EventSlaveStateChanged(datapath, port, True))
+
+        # set the idle_timeout time using the actor state of the
+        # received packet.
+        if req_lacp.LACP_STATE_SHORT_TIMEOUT == \
+           req_lacp.actor_state_timeout:
+            idle_timeout = req_lacp.SHORT_TIMEOUT_TIME
+        else:
+            idle_timeout = req_lacp.LONG_TIMEOUT_TIME
+
+        # when the timeout time has changed, update the timeout time of
+        # the slave i/f and re-enter a flow entry for the packet from
+        # the slave i/f with idle_timeout.
+        if idle_timeout != self.__get_slave_timeout(dpid, port):
+            self.logger.info(
+                "SW=%s PORT=%d the timeout time has changed.",
+                dpid_to_str(dpid), port)
+            self.__set_slave_timeout(dpid, port, idle_timeout)
+            self.__set_flow_entry_packet_in(src, port, idle_timeout,
+                                            msg)
+
+        # create a response packet.
+        res_pkt = self.__create_response(datapath, port, req_lacp)
+
+        # packet-out the response packet.
+        out_port = ofproto.OFPP_IN_PORT
+        actions = [parser.OFPActionOutput(out_port)]
+        out = datapath.ofproto_parser.OFPPacketOut(
+            datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
+            data=res_pkt.data, in_port=port, actions=actions)
+        datapath.send_msg(out)
+
+    def __create_response(self, datapath, port, req):
+        """create a packet including LACP."""
+        src = datapath.ports[port].hw_addr
+        res_ether = ethernet.ethernet(
+            slow.SLOW_PROTOCOL_MULTICAST, src, ether.ETH_TYPE_SLOW)
+        res_lacp = self.__create_lacp(datapath, port, req)
+        res_pkt = packet.Packet()
+        res_pkt.add_protocol(res_ether)
+        res_pkt.add_protocol(res_lacp)
+        res_pkt.serialize()
+        return res_pkt
+
+    def __create_lacp(self, datapath, port, req):
+        """create a LACP packet."""
+        actor_system = datapath.ports[datapath.ofproto.OFPP_LOCAL].hw_addr
+        res = slow.lacp(
+            actor_system_priority=0xffff,
+            actor_system=actor_system,
+            actor_key=req.actor_key,
+            actor_port_priority=0xff,
+            actor_port=port,
+            actor_state_activity=req.LACP_STATE_PASSIVE,
+            actor_state_timeout=req.actor_state_timeout,
+            actor_state_aggregation=req.actor_state_aggregation,
+            actor_state_synchronization=req.actor_state_synchronization,
+            actor_state_collecting=req.actor_state_collecting,
+            actor_state_distributing=req.actor_state_distributing,
+            actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER,
+            actor_state_expired=req.LACP_STATE_NOT_EXPIRED,
+            partner_system_priority=req.actor_system_priority,
+            partner_system=req.actor_system,
+            partner_key=req.actor_key,
+            partner_port_priority=req.actor_port_priority,
+            partner_port=req.actor_port,
+            partner_state_activity=req.actor_state_activity,
+            partner_state_timeout=req.actor_state_timeout,
+            partner_state_aggregation=req.actor_state_aggregation,
+            partner_state_synchronization=req.actor_state_synchronization,
+            partner_state_collecting=req.actor_state_collecting,
+            partner_state_distributing=req.actor_state_distributing,
+            partner_state_defaulted=req.actor_state_defaulted,
+            partner_state_expired=req.actor_state_expired,
+            collector_max_delay=0)
+        self.logger.info("SW=%s PORT=%d LACP sent.",
+                         dpid_to_str(datapath.id), port)
+        self.logger.debug(str(res))
+        return res
+
+    def __get_slave_enabled(self, dpid, port):
+        """get whether a slave i/f at some port of some datapath is
+        enable or not."""
+        slave = self.__get_slave(dpid, port)
+        if slave:
+            return slave['enabled']
+        else:
+            return False
+
+    def __set_slave_enabled(self, dpid, port, enabled):
+        """set whether a slave i/f at some port of some datapath is
+        enable or not."""
+        slave = self.__get_slave(dpid, port)
+        if slave:
+            slave['enabled'] = enabled
+
+    def __get_slave_timeout(self, dpid, port):
+        """get the timeout time at some port of some datapath."""
+        slave = self.__get_slave(dpid, port)
+        if slave:
+            return slave['timeout']
+        else:
+            return 0
+
+    def __set_slave_timeout(self, dpid, port, timeout):
+        """set the timeout time at some port of some datapath."""
+        slave = self.__get_slave(dpid, port)
+        if slave:
+            slave['timeout'] = timeout
+
+    def __get_slave(self, dpid, port):
+        """get slave i/f at some port of some datapath."""
+        result = None
+        for bond in self.bonds:
+            if dpid in bond:
+                if port in bond[dpid]:
+                    result = bond[dpid][port]
+                    break
+        return result
+
+    #-------------------------------------------------------------------
+    # PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL )
+    #-------------------------------------------------------------------
+    def __set_flow_entry_packet_in(self, src, port, timeout, msg):
+        """enter a flow entry for the packet from the slave i/f
+        with idle_timeout."""
+        datapath = msg.datapath
+        ofproto = datapath.ofproto
+        parser = datapath.ofproto_parser
+
+        wildcards = ofproto.OFPFW_ALL
+        wildcards &= ~ofproto.OFPFW_IN_PORT
+        wildcards &= ~ofproto.OFPFW_DL_SRC
+        wildcards &= ~ofproto.OFPFW_DL_TYPE
+        match = parser.OFPMatch(wildcards=wildcards, in_port=port,
+                                dl_src=addrconv.mac.text_to_bin(src),
+                                dl_type=ether.ETH_TYPE_SLOW)
+        actions = [parser.OFPActionOutput(
+            ofproto.OFPP_CONTROLLER, ofproto.OFP_MSG_SIZE_MAX)]
+        mod = parser.OFPFlowMod(
+            datapath=datapath, match=match, cookie=0,
+            command=ofproto.OFPFC_ADD, idle_timeout=timeout,
+            flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
+        datapath.send_msg(mod)
+
+    #-------------------------------------------------------------------
+    # PRIVATE METHODS ( OTHERS )
+    #-------------------------------------------------------------------
+    def __set_logger(self):
+        """change log format."""
+        self.logger.propagate = False
+        hdl = logging.StreamHandler()
+        fmt_str = '[LACP][%(levelname)s] %(message)s'
+        hdl.setFormatter(logging.Formatter(fmt_str))
+        self.logger.addHandler(hdl)
-- 
1.7.10.4


------------------------------------------------------------------------------
Introducing Performance Central, a new site from SourceForge and 
AppDynamics. Performance Central is your source for news, insights, 
analysis and resources for efficient Application Performance Management. 
Visit us today!
http://pubads.g.doubleclick.net/gampad/clk?id=48897511&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to