Sorry about not responding timely.

On Tue, 27 Aug 2013 09:53:19 +0900
Yuichi Ito <[email protected]> wrote:

> Changes v2 -> v3:
> 
> - use datapath.ports instead of LacpLib.src_port_to_mac
> - with the above, remove LacpLib.switch_features_handler
> 
> 
> 
> this application provides the simple example of link aggregation using LACP.
> 
> the module "lacplib" controls exchange of LACP packets and watches the status 
> of the slave i/fs.
> the status changes if the i/fs went into a LAG or timeout to exchange LACP 
> occurred.
> the module sends a "EventSlaveStateChanged" event when the status changed.
> 
> the module "simple_switch_lacp" is a variation of "simple_switch".
> when the module received "EventSlaveStateChanged" event, the module resets 
> flow entries.
> 
> to run:
> ryu-manager ryu/app/simple_switch_lacp.py
> 
> Signed-off-by: Yuichi Ito <[email protected]>
> ---
>  ryu/app/simple_switch_lacp.py |  145 ++++++++++++++++++++
>  ryu/lib/lacplib.py            |  292 
> +++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 437 insertions(+)
>  create mode 100644 ryu/app/simple_switch_lacp.py
>  create mode 100644 ryu/lib/lacplib.py
> 
> diff --git a/ryu/app/simple_switch_lacp.py b/ryu/app/simple_switch_lacp.py
> new file mode 100644
> index 0000000..d70bba1
> --- /dev/null
> +++ b/ryu/app/simple_switch_lacp.py
> @@ -0,0 +1,145 @@
> +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at
> +#
> +#    http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> +# implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +from ryu.base import app_manager
> +from ryu.controller import ofp_event
> +from ryu.controller.handler import MAIN_DISPATCHER
> +from ryu.controller.handler import set_ev_cls
> +from ryu.ofproto import ofproto_v1_0
> +from ryu.lib import addrconv
> +from ryu.lib import lacplib
> +from ryu.lib.dpid import str_to_dpid
> +from ryu.lib.packet import ethernet
> +from ryu.lib.packet import packet
> +from ryu.lib.packet import slow
> +
> +
> +class SimpleSwitchLacp(app_manager.RyuApp):
> +    OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
> +    _CONTEXTS = {'lacplib': lacplib.LacpLib}
> +
> +    def __init__(self, *args, **kwargs):
> +        super(SimpleSwitchLacp, self).__init__(*args, **kwargs)
> +        self.mac_to_port = {}
> +        self._lacp = kwargs['lacplib']
> +        # in this sample application, bonding i/fs of the switchs
> +        # shall be set up as follows:
> +        # - the port 1 and 2 of the datapath 1 face the slave i/fs.
> +        # - the port 3, 4 and 5 of the datapath 1 face the others.
> +        # - the port 1 and 2 of the datapath 2 face the others.
> +        self._lacp.add(
> +            dpid=str_to_dpid('0000000000000001'), ports=[1, 2])
> +        self._lacp.add(
> +            dpid=str_to_dpid('0000000000000001'), ports=[3, 4, 5])
> +        self._lacp.add(
> +            dpid=str_to_dpid('0000000000000002'), ports=[1, 2])
> +
> +    def add_flow(self, datapath, in_port, dst, actions):
> +        ofproto = datapath.ofproto
> +
> +        wildcards = ofproto_v1_0.OFPFW_ALL
> +        wildcards &= ~ofproto_v1_0.OFPFW_IN_PORT
> +        wildcards &= ~ofproto_v1_0.OFPFW_DL_DST
> +
> +        match = datapath.ofproto_parser.OFPMatch(
> +            wildcards, in_port, 0, addrconv.mac.text_to_bin(dst),
> +            0, 0, 0, 0, 0, 0, 0, 0, 0)
> +
> +        mod = datapath.ofproto_parser.OFPFlowMod(
> +            datapath=datapath, match=match, cookie=0,
> +            command=ofproto.OFPFC_ADD, idle_timeout=0, hard_timeout=0,
> +            priority=ofproto.OFP_DEFAULT_PRIORITY, actions=actions)
> +        datapath.send_msg(mod)
> +
> +    def del_flow(self, datapath, dst):
> +        ofproto = datapath.ofproto
> +        parser = datapath.ofproto_parser
> +
> +        match = parser.OFPMatch(dl_dst=addrconv.mac.text_to_bin(dst))
> +        mod = parser.OFPFlowMod(
> +            datapath=datapath, match=match, cookie=0,
> +            command=ofproto.OFPFC_DELETE)
> +        datapath.send_msg(mod)
> +
> +    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
> +    def _packet_in_handler(self, ev):
> +        msg = ev.msg
> +        datapath = msg.datapath
> +        ofproto = datapath.ofproto
> +
> +        pkt = packet.Packet(msg.data)
> +        if slow.lacp in pkt:
> +            # do nothing
> +            return

I simply asked your intention about EvenPacketIn about v1. I guess
that you tried to hide LACP from applications. It's fine by me. If you
prefer that approach, please revert the code. Sorry that I was not
clear enough.


> +        eth = pkt.get_protocols(ethernet.ethernet)[0]
> +
> +        dst = eth.dst
> +        src = eth.src
> +
> +        dpid = datapath.id
> +        self.mac_to_port.setdefault(dpid, {})
> +
> +        self.logger.info("packet in %s %s %s %s",
> +                         dpid, src, dst, msg.in_port)
> +
> +        # learn a mac address to avoid FLOOD next time.
> +        self.mac_to_port[dpid][src] = msg.in_port
> +
> +        if dst in self.mac_to_port[dpid]:
> +            out_port = self.mac_to_port[dpid][dst]
> +        else:
> +            out_port = ofproto.OFPP_FLOOD
> +
> +        actions = [datapath.ofproto_parser.OFPActionOutput(out_port)]
> +
> +        # install a flow to avoid packet_in next time
> +        if out_port != ofproto.OFPP_FLOOD:
> +            self.add_flow(datapath, msg.in_port, dst, actions)
> +
> +        out = datapath.ofproto_parser.OFPPacketOut(
> +            datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
> +            actions=actions)
> +        datapath.send_msg(out)
> +
> +    @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
> +    def _port_status_handler(self, ev):
> +        msg = ev.msg
> +        reason = msg.reason
> +        port_no = msg.desc.port_no
> +
> +        ofproto = msg.datapath.ofproto
> +        if reason == ofproto.OFPPR_ADD:
> +            self.logger.info("port added %s", port_no)
> +        elif reason == ofproto.OFPPR_DELETE:
> +            self.logger.info("port deleted %s", port_no)
> +        elif reason == ofproto.OFPPR_MODIFY:
> +            self.logger.info("port modified %s", port_no)
> +        else:
> +            self.logger.info("Illeagal port state %s %s", port_no, reason)

This handler is necessary? I know that the original simple_switch.py
has this to show how to handle PortStatus but I guess that we don't
need this in this code.

> +    @set_ev_cls(lacplib.EventSlaveStateChanged, lacplib.LAG_EV_DISPATCHER)
> +    def _slave_state_changed_handler(self, ev):
> +        datapath = ev.datapath
> +        dpid = datapath.id
> +        port_no = ev.port
> +        enabled = ev.enabled
> +        self.logger.info("slave state changed port: %d enabled: %s",
> +                         port_no, enabled)
> +        if dpid in self.mac_to_port:
> +            for mac in self.mac_to_port[dpid]:
> +                self.del_flow(datapath, mac)
> +            del self.mac_to_port[dpid]
> +        self.mac_to_port.setdefault(dpid, {})
> diff --git a/ryu/lib/lacplib.py b/ryu/lib/lacplib.py
> new file mode 100644
> index 0000000..0ec68ed
> --- /dev/null
> +++ b/ryu/lib/lacplib.py
> @@ -0,0 +1,292 @@
> +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at
> +#
> +#    http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> +# implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +import logging
> +
> +from ryu.base import app_manager
> +from ryu.controller import event
> +from ryu.controller import ofp_event
> +from ryu.controller.handler import MAIN_DISPATCHER
> +from ryu.controller.handler import set_ev_cls
> +from ryu.ofproto import ether
> +from ryu.lib import addrconv
> +from ryu.lib.dpid import dpid_to_str
> +from ryu.lib.packet import packet
> +from ryu.lib.packet import ethernet
> +from ryu.lib.packet import slow
> +
> +
> +LAG_EV_DISPATCHER = "lacplib"
> +
> +
> +class EventSlaveStateChanged(event.EventBase):
> +    def __init__(self, datapath, port, enabled):
> +        """initialization."""
> +        super(EventSlaveStateChanged, self).__init__()
> +        self.datapath = datapath
> +        self.port = port
> +        self.enabled = enabled
> +
> +
> +class LacpLib(app_manager.RyuApp):
> +
> +    #-------------------------------------------------------------------
> +    # PUBLIC METHODS
> +    #-------------------------------------------------------------------
> +    def __init__(self):
> +        """initialization."""
> +        super(LacpLib, self).__init__()
> +        self.name = 'lacplib'
> +        self.bonds = []
> +        self.__set_logger()
> +
> +    def add(self, dpid, ports):
> +        """add a setting of a bonding i/f.
> +        'add' method takes the correspondig args in this order.
> +
> +        ========= =====================================================
> +        Attribute Description
> +        ========= =====================================================
> +        dpid      an integer value that means datapath id.
> +
> +        ports     a list of integer values that means the ports face
> +                  with the slave i/fs.
> +        ========= =====================================================
> +
> +        if you want to use multi LAG, call 'add' method more than once.
> +        """
> +        assert isinstance(ports, list)
> +        assert 2 <= len(ports)
> +        ifs = {}
> +        for port in ports:
> +            ifs[port] = {'enabled': False, 'timeout': 0}
> +        bond = {}
> +        bond[dpid] = ifs
> +        self.bonds.append(bond)
> +
> +    #-------------------------------------------------------------------
> +    # PUBLIC METHODS ( EVENT HANDLERS )
> +    #-------------------------------------------------------------------
> +    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
> +    def packet_in_handler(self, evt):
> +        """PacketIn event handler. when the received packet was LACP,
> +        proceed it. otherwise, do nothing."""
> +        req_pkt = packet.Packet(evt.msg.data)
> +        if slow.lacp in req_pkt:
> +            (req_lacp, ) = req_pkt.get_protocols(slow.lacp)
> +            (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
> +            self.__do_lacp(req_lacp, req_eth.src, evt.msg)
> +        else:
> +            # do nothing
> +            pass
> +
> +    @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)
> +    def flow_removed_handler(self, evt):
> +        """FlowRemoved event handler. when the removed flow entry was
> +        for LACP, set the status of the slave i/f to enabled, and
> +        send a event that LACP exchange timeout has occurred."""
> +        msg = evt.msg
> +        datapath = msg.datapath
> +        dpid = datapath.id
> +        match = msg.match
> +        port = match.in_port
> +        if ether.ETH_TYPE_SLOW != match.dl_type:
> +            return
> +        self.logger.info(
> +            "SW=%s PORT=%d LACP exchange timeout has occurred.",
> +            dpid_to_str(dpid), port)
> +        self.__set_slave_enabled(dpid, port, False)
> +        self.__set_slave_timeout(dpid, port, 0)
> +        self.send_event_to_observers(
> +            EventSlaveStateChanged(datapath, port, False))
> +
> +    #-------------------------------------------------------------------
> +    # PRIVATE METHODS ( RELATED TO LACP )
> +    #-------------------------------------------------------------------

All the code in Ryu uses a single underscore for a private method? If
so, I prefer to do so in this code too for consistency.


> +    def __do_lacp(self, req_lacp, src, msg):
> +        """packet-in process when the received packet is LACP."""
> +        datapath = msg.datapath
> +        dpid = datapath.id
> +        port = msg.in_port
> +        ofproto = datapath.ofproto
> +        parser = datapath.ofproto_parser
> +
> +        self.logger.info("SW=%s PORT=%d LACP received.",
> +                         dpid_to_str(dpid), port)
> +        self.logger.debug(str(req_lacp))
> +
> +        # when LACP arrived at disabled port, update the status of
> +        # the slave i/f and reset all flow entries except for LACP.
> +        if not self.__get_slave_enabled(dpid, port):
> +            self.logger.info(
> +                "SW=%s PORT=%d the slave i/f has just been up.",
> +                dpid_to_str(dpid), port)
> +            self.__set_slave_enabled(dpid, port, True)
> +            self.send_event_to_observers(
> +                EventSlaveStateChanged(datapath, port, True))
> +
> +        # set the idle_timeout time using the actor state of the
> +        # received packet.
> +        if req_lacp.LACP_STATE_SHORT_TIMEOUT == \
> +           req_lacp.actor_state_timeout:
> +            idle_timeout = req_lacp.SHORT_TIMEOUT_TIME
> +        else:
> +            idle_timeout = req_lacp.LONG_TIMEOUT_TIME
> +
> +        # when the timeout time has changed, update the timeout time of
> +        # the slave i/f and re-enter a flow entry for the packet from
> +        # the slave i/f with idle_timeout.
> +        if idle_timeout != self.__get_slave_timeout(dpid, port):
> +            self.logger.info(
> +                "SW=%s PORT=%d the timeout time has changed.",
> +                dpid_to_str(dpid), port)
> +            self.__set_slave_timeout(dpid, port, idle_timeout)
> +            self.__set_flow_entry_packet_in(src, port, idle_timeout,
> +                                            msg)
> +
> +        # create a response packet.
> +        res_pkt = self.__create_response(datapath, port, req_lacp)
> +
> +        # packet-out the response packet.
> +        out_port = ofproto.OFPP_IN_PORT
> +        actions = [parser.OFPActionOutput(out_port)]
> +        out = datapath.ofproto_parser.OFPPacketOut(
> +            datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
> +            data=res_pkt.data, in_port=port, actions=actions)
> +        datapath.send_msg(out)
> +
> +    def __create_response(self, datapath, port, req):
> +        """create a packet including LACP."""
> +        src = datapath.ports[port].hw_addr
> +        res_ether = ethernet.ethernet(
> +            slow.SLOW_PROTOCOL_MULTICAST, src, ether.ETH_TYPE_SLOW)
> +        res_lacp = self.__create_lacp(datapath, port, req)
> +        res_pkt = packet.Packet()
> +        res_pkt.add_protocol(res_ether)
> +        res_pkt.add_protocol(res_lacp)
> +        res_pkt.serialize()
> +        return res_pkt
> +
> +    def __create_lacp(self, datapath, port, req):
> +        """create a LACP packet."""
> +        actor_system = datapath.ports[datapath.ofproto.OFPP_LOCAL].hw_addr
> +        res = slow.lacp(
> +            actor_system_priority=0xffff,
> +            actor_system=actor_system,
> +            actor_key=req.actor_key,
> +            actor_port_priority=0xff,
> +            actor_port=port,
> +            actor_state_activity=req.LACP_STATE_PASSIVE,
> +            actor_state_timeout=req.actor_state_timeout,
> +            actor_state_aggregation=req.actor_state_aggregation,
> +            actor_state_synchronization=req.actor_state_synchronization,
> +            actor_state_collecting=req.actor_state_collecting,
> +            actor_state_distributing=req.actor_state_distributing,
> +            actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER,
> +            actor_state_expired=req.LACP_STATE_NOT_EXPIRED,
> +            partner_system_priority=req.actor_system_priority,
> +            partner_system=req.actor_system,
> +            partner_key=req.actor_key,
> +            partner_port_priority=req.actor_port_priority,
> +            partner_port=req.actor_port,
> +            partner_state_activity=req.actor_state_activity,
> +            partner_state_timeout=req.actor_state_timeout,
> +            partner_state_aggregation=req.actor_state_aggregation,
> +            partner_state_synchronization=req.actor_state_synchronization,
> +            partner_state_collecting=req.actor_state_collecting,
> +            partner_state_distributing=req.actor_state_distributing,
> +            partner_state_defaulted=req.actor_state_defaulted,
> +            partner_state_expired=req.actor_state_expired,
> +            collector_max_delay=0)
> +        self.logger.info("SW=%s PORT=%d LACP sent.",
> +                         dpid_to_str(datapath.id), port)
> +        self.logger.debug(str(res))
> +        return res
> +
> +    def __get_slave_enabled(self, dpid, port):
> +        """get whether a slave i/f at some port of some datapath is
> +        enable or not."""
> +        slave = self.__get_slave(dpid, port)
> +        if slave:
> +            return slave['enabled']
> +        else:
> +            return False
> +
> +    def __set_slave_enabled(self, dpid, port, enabled):
> +        """set whether a slave i/f at some port of some datapath is
> +        enable or not."""
> +        slave = self.__get_slave(dpid, port)
> +        if slave:
> +            slave['enabled'] = enabled
> +
> +    def __get_slave_timeout(self, dpid, port):
> +        """get the timeout time at some port of some datapath."""
> +        slave = self.__get_slave(dpid, port)
> +        if slave:
> +            return slave['timeout']
> +        else:
> +            return 0
> +
> +    def __set_slave_timeout(self, dpid, port, timeout):
> +        """set the timeout time at some port of some datapath."""
> +        slave = self.__get_slave(dpid, port)
> +        if slave:
> +            slave['timeout'] = timeout
> +
> +    def __get_slave(self, dpid, port):
> +        """get slave i/f at some port of some datapath."""
> +        result = None
> +        for bond in self.bonds:
> +            if dpid in bond:
> +                if port in bond[dpid]:
> +                    result = bond[dpid][port]
> +                    break
> +        return result
> +
> +    #-------------------------------------------------------------------
> +    # PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL )
> +    #-------------------------------------------------------------------
> +    def __set_flow_entry_packet_in(self, src, port, timeout, msg):
> +        """enter a flow entry for the packet from the slave i/f
> +        with idle_timeout."""
> +        datapath = msg.datapath
> +        ofproto = datapath.ofproto
> +        parser = datapath.ofproto_parser
> +
> +        wildcards = ofproto.OFPFW_ALL
> +        wildcards &= ~ofproto.OFPFW_IN_PORT
> +        wildcards &= ~ofproto.OFPFW_DL_SRC
> +        wildcards &= ~ofproto.OFPFW_DL_TYPE
> +        match = parser.OFPMatch(wildcards=wildcards, in_port=port,
> +                                dl_src=addrconv.mac.text_to_bin(src),
> +                                dl_type=ether.ETH_TYPE_SLOW)
> +        actions = [parser.OFPActionOutput(
> +            ofproto.OFPP_CONTROLLER, ofproto.OFP_MSG_SIZE_MAX)]
> +        mod = parser.OFPFlowMod(
> +            datapath=datapath, match=match, cookie=0,
> +            command=ofproto.OFPFC_ADD, idle_timeout=timeout,
> +            flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
> +        datapath.send_msg(mod)
> +
> +    #-------------------------------------------------------------------
> +    # PRIVATE METHODS ( OTHERS )
> +    #-------------------------------------------------------------------
> +    def __set_logger(self):
> +        """change log format."""
> +        self.logger.propagate = False
> +        hdl = logging.StreamHandler()
> +        fmt_str = '[LACP][%(levelname)s] %(message)s'
> +        hdl.setFormatter(logging.Formatter(fmt_str))
> +        self.logger.addHandler(hdl)
> -- 
> 1.7.10.4
> 
> 
> ------------------------------------------------------------------------------
> Introducing Performance Central, a new site from SourceForge and 
> AppDynamics. Performance Central is your source for news, insights, 
> analysis and resources for efficient Application Performance Management. 
> Visit us today!
> http://pubads.g.doubleclick.net/gampad/clk?id=48897511&iu=/4140/ostg.clktrk
> _______________________________________________
> Ryu-devel mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/ryu-devel

------------------------------------------------------------------------------
Introducing Performance Central, a new site from SourceForge and 
AppDynamics. Performance Central is your source for news, insights, 
analysis and resources for efficient Application Performance Management. 
Visit us today!
http://pubads.g.doubleclick.net/gampad/clk?id=48897511&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to