This is an alternative of dpset, which supports event passing.

event.py: event class
switches.py: switch(datapath) discovery app using ofp_event
dumper.py: test and example app using switch discovery event

Signed-off-by: YAMADA Hideki <[email protected]>
---
 ryu/topology/dumper.py   |  104 ++++++++++++++++++++++
 ryu/topology/event.py    |   86 ++++++++++++++++++
 ryu/topology/switches.py |  221 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 411 insertions(+)
 create mode 100644 ryu/topology/__init__.py
 create mode 100644 ryu/topology/dumper.py
 create mode 100644 ryu/topology/event.py
 create mode 100644 ryu/topology/switches.py

diff --git a/ryu/topology/__init__.py b/ryu/topology/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py
new file mode 100644
index 0000000..426bcb5
--- /dev/null
+++ b/ryu/topology/dumper.py
@@ -0,0 +1,104 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import gevent
+import gevent.queue
+import time
+
+from ryu.base import app_manager
+from ryu.controller.handler import set_ev_handler
+from ryu.topology import switches, event
+
+LOG = logging.getLogger(__name__)
+
+
+class DiscoveryEventDumper(app_manager.RyuApp):
+    ''' This app dumps discovery events
+    '''
+
+    def __init__(self):
+        super(DiscoveryEventDumper, self).__init__()
+
+        # For testing when sync and async request.
+#        self.threads.append(gevent.spawn_later(0, self._request_sync, 5))
+        self.threads.append(gevent.spawn_later(0, self._request_async, 10))
+
+        self.is_active = True
+
+    @set_ev_handler(event.EventSwitchEnter)
+    def switch_enter_handler(self, ev):
+        LOG.debug(ev)
+
+    @set_ev_handler(event.EventSwitchLeave)
+    def switch_leave_handler(self, ev):
+        LOG.debug(ev)
+
+    @set_ev_handler(event.EventPortAdd)
+    def port_add_handler(self, ev):
+        LOG.debug(ev)
+
+    @set_ev_handler(event.EventPortDelete)
+    def port_delete_handler(self, ev):
+        LOG.debug(ev)
+
+    @set_ev_handler(event.EventPortModify)
+    def port_modify_handler(self, ev):
+        LOG.debug(ev)
+
+    def _request_sync(self, interval):
+        while self.is_active:
+            request = event.EventSwitchRequest()
+            LOG.debug('request sync %s thread(%s)',
+                      request, id(gevent.getcurrent()))
+            reply = self.send_request(request)
+            LOG.debug('reply sync %s', reply)
+            if len(reply.switches) > 0:
+                for sw in reply.switches:
+                    LOG.debug('  %s', sw)
+            gevent.sleep(interval)
+
+    def _request_async(self, interval):
+        while self.is_active:
+            request = event.EventSwitchRequest()
+            LOG.debug('request async %s thread(%s)',
+                      request, id(gevent.getcurrent()))
+            self.send_event(request.dst, request)
+
+            start = time.time()
+            busy = interval / 2
+            i = 0
+            while i < busy:
+                if time.time() > start + i:
+                    i += 1
+                    LOG.debug('  thread is busy... %s/%s thread(%s)',
+                              i, busy, id(gevent.getcurrent()))
+            LOG.debug('  thread yield to reply handler. thread(%s)',
+                      id(gevent.getcurrent()))
+
+            # yield
+            gevent.sleep(0)
+
+            LOG.debug('  thread get back. thread(%s)',
+                      id(gevent.getcurrent()))
+            gevent.sleep(interval - busy)
+
+    @set_ev_handler(event.EventSwitchReply)
+    def switch_reply_handler(self, reply):
+        LOG.debug('reply async %s', reply)
+        if len(reply.switches) > 0:
+            for sw in reply.switches:
+                LOG.debug('  %s', sw)
diff --git a/ryu/topology/event.py b/ryu/topology/event.py
new file mode 100644
index 0000000..0c9ce7c
--- /dev/null
+++ b/ryu/topology/event.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from ryu.controller import event
+
+LOG = logging.getLogger(__name__)
+
+
+class EventSwitchBase(event.EventBase):
+    def __init__(self, switch):
+        super(EventSwitchBase, self).__init__()
+        self.switch = switch
+
+    def __str__(self):
+        return '%s<dpid=%s, %s ports>' % \
+            (self.__class__.__name__,
+             self.switch.dp.id, len(self.switch.ports))
+
+
+class EventSwitchEnter(EventSwitchBase):
+    def __init__(self, switch):
+        super(EventSwitchEnter, self).__init__(switch)
+
+
+class EventSwitchLeave(EventSwitchBase):
+    def __init__(self, switch):
+        super(EventSwitchLeave, self).__init__(switch)
+
+
+class EventPortBase(event.EventBase):
+    def __init__(self, port):
+        super(EventPortBase, self).__init__()
+        self.port = port
+
+    def __str__(self):
+        return '%s<%s>' % (self.__class__.__name__, self.port)
+
+
+class EventPortAdd(EventPortBase):
+    def __init__(self, port):
+        super(EventPortAdd, self).__init__(port)
+
+
+class EventPortDelete(EventPortBase):
+    def __init__(self, port):
+        super(EventPortDelete, self).__init__(port)
+
+
+class EventPortModify(EventPortBase):
+    def __init__(self, port):
+        super(EventPortModify, self).__init__(port)
+
+
+class EventSwitchRequest(event.EventRequestBase):
+    # If dpid is None, reply all list
+    def __init__(self, dpid=None):
+        super(EventSwitchRequest, self).__init__()
+        self.dst = 'switches'
+        self.dpid = dpid
+
+    def __str__(self):
+        return 'EventSwitchRequest<src=%s, dpid=%s>' % \
+            (self.src, self.dpid)
+
+
+class EventSwitchReply(event.EventReplyBase):
+    def __init__(self, dst, switches):
+        super(EventSwitchReply, self).__init__(dst)
+        self.switches = switches
+
+    def __str__(self):
+        return 'EventSwitchReply<dst=%s, %s>' % \
+            (self.dst, self.switches)
diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
new file mode 100644
index 0000000..6cf9af7
--- /dev/null
+++ b/ryu/topology/switches.py
@@ -0,0 +1,221 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from ryu.topology import event
+from ryu.base import app_manager
+from ryu.controller import ofp_event
+from ryu.controller.handler import set_ev_cls
+from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
+
+LOG = logging.getLogger(__name__)
+
+
+class Port(object):
+    # This is data class passed by EventPortXXX
+    def __init__(self, dpid, ofproto, ofpport):
+        super(Port, self).__init__()
+
+        self.dpid = dpid
+        self._ofproto = ofproto
+        self._config = ofpport.config
+        self._state = ofpport.state
+
+        self.port_no = ofpport.port_no
+        self.hw_addr = ofpport.hw_addr
+        self.name = ofpport.name
+
+    def is_reserved(self):
+        return self.port_no > self._ofproto.OFPP_MAX
+
+    def is_down(self):
+        return (self._state & self._ofproto.OFPPS_LINK_DOWN) > 0 \
+            or (self._config & self._ofproto.OFPPC_PORT_DOWN) > 0
+
+    def is_live(self):
+        # NOTE: OF1.2 has OFPPS_LIVE state
+        #       return (self._state & self._ofproto.OFPPS_LIVE) > 0
+        return not self.is_down()
+
+    # for Switch.del_port()
+    def __eq__(self, other):
+        return self.dpid == other.dpid and self.port_no == other.port_no
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash((self.dpid, self.port_no))
+
+    def __str__(self):
+        LIVE_MSG = {False: 'DOWN', True: 'LIVE'}
+        return 'Port<dpid=%s, port_no=%s, %s>' % \
+            (self.dpid, self.port_no, LIVE_MSG[self.is_live()])
+
+
+class Switch(object):
+    # This is data class passed by EventSwitchXXX
+    def __init__(self, dp):
+        super(Switch, self).__init__()
+
+        self.dp = dp
+        self.ports = []
+
+    def add_port(self, ofpport):
+        port = Port(self.dp.id, self.dp.ofproto, ofpport)
+        if not port.is_reserved():
+            self.ports.append(port)
+
+    def del_port(self, ofpport):
+        self.ports.remove(Port(ofpport))
+
+    def __str__(self):
+        msg = 'Switch<dpid=%s, ' % self.dp.id
+        for port in self.ports:
+            msg += str(port) + ' '
+
+        msg += '>'
+        return msg
+
+
+class PortState(dict):
+    # dict: int port_no -> OFPPort port
+    # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
+    def __init__(self):
+        super(PortState, self).__init__()
+
+    def add(self, port_no, port):
+        self[port_no] = port
+
+    def remove(self, port_no):
+        del self[port_no]
+
+    def modify(self, port_no, port):
+        self[port_no] = port
+
+
+class Switches(app_manager.RyuApp):
+    _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
+               event.EventPortAdd, event.EventPortDelete,
+               event.EventPortModify]
+
+    def __init__(self):
+        super(Switches, self).__init__()
+
+        self.name = 'switches'
+        self.dps = {}   # datapath_id => class Datapath
+        self.port_state = {}  # datapath_id => ports
+
+    def _register(self, dp):
+        assert dp.id is not None
+        assert dp.id not in self.dps
+
+        self.dps[dp.id] = dp
+        self.port_state[dp.id] = PortState()
+        for port in dp.ports.values():
+            self.port_state[dp.id].add(port.port_no, port)
+
+    def _unregister(self, dp):
+        if dp.id in self.dps:
+            del self.dps[dp.id]
+            del self.port_state[dp.id]
+
+    def _get_switch(self, dp):
+        switch = Switch(dp)
+        for ofpport in self.port_state[dp.id].itervalues():
+            switch.add_port(ofpport)
+        return switch
+
+    @set_ev_cls(ofp_event.EventOFPStateChange,
+                [MAIN_DISPATCHER, DEAD_DISPATCHER])
+    def state_change_handler(self, ev):
+        dp = ev.datapath
+        assert dp is not None
+        LOG.debug(dp)
+
+        if ev.state == MAIN_DISPATCHER:
+            self._register(dp)
+            switch = self._get_switch(dp)
+            LOG.debug('register %s', switch)
+            self.send_event_to_observers(event.EventSwitchEnter(switch))
+
+        elif ev.state == DEAD_DISPATCHER:
+            # dp.id is None when datapath dies before handshake
+            if dp.id is None:
+                return
+            switch = self._get_switch(dp)
+            self._unregister(dp)
+            LOG.debug('unregister %s', switch)
+            self.send_event_to_observers(event.EventSwitchLeave(switch))
+
+    @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
+    def port_status_handler(self, ev):
+        msg = ev.msg
+        reason = msg.reason
+        dp = msg.datapath
+        ofpport = msg.desc
+
+        if reason == dp.ofproto.OFPPR_ADD:
+            #LOG.debug('A port was added.' +
+            #          '(datapath id = %s, port number = %s)',
+            #          dp.id, ofpport.port_no)
+            self.port_state[dp.id].add(ofpport.port_no, ofpport)
+            self.send_event_to_observers(
+                event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport)))
+
+        elif reason == dp.ofproto.OFPPR_DELETE:
+            #LOG.debug('A port was deleted.' +
+            #          '(datapath id = %s, port number = %s)',
+            #          dp.id, ofpport.port_no)
+            self.port_state[dp.id].remove(ofpport.port_no)
+            self.send_event_to_observers(
+                event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport)))
+
+        else:
+            assert reason == dp.ofproto.OFPPR_MODIFY
+            #LOG.debug('A port was modified.' +
+            #          '(datapath id = %s, port number = %s)',
+            #          dp.id, ofpport.port_no)
+            self.port_state[dp.id].modify(ofpport.port_no, ofpport)
+            self.send_event_to_observers(
+                event.EventPortModify(Port(dp.id, dp.ofproto, ofpport)))
+
+    @set_ev_cls(event.EventSwitchRequest)
+    def switch_request_handler(self, req):
+        LOG.debug(req)
+        dpid = req.dpid
+
+        switches = []
+        if dpid is None:
+            # reply all list
+            for dp in self.dps.itervalues():
+                switches.append(self._get_switch(dp))
+        elif dpid in self.dps:
+            switches.append(self._get_switch(self.dps[dpid]))
+
+        rep = event.EventSwitchReply(req.src, switches)
+        if req.sync:
+            self.send_reply(rep)
+        else:
+            self.send_event(req.src, rep)
+
+
+def get(app, dpid=None):
+    return app.send_request(event.EventSwitchRequest(dpid))
+
+
+def get_all(app):
+    return get(app)
-- 
1.7.9.5



------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_d2d_mar
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to