This application dynamically creates/deletes tunnel ports for GRE tunnel app.

Signed-off-by: Isaku Yamahata <yamah...@valinux.co.jp>
---
Changes v3 -> v4:
- 'key=flow' option to tunnel port
---
 ryu/app/tunnel_port_updater.py |  470 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 470 insertions(+)
 create mode 100644 ryu/app/tunnel_port_updater.py

diff --git a/ryu/app/tunnel_port_updater.py b/ryu/app/tunnel_port_updater.py
new file mode 100644
index 0000000..4fdd8ce
--- /dev/null
+++ b/ryu/app/tunnel_port_updater.py
@@ -0,0 +1,470 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import gevent
+import gflags
+import logging
+import netaddr
+
+from ryu import exception as ryu_exc
+from ryu.app import conf_switch_key as cs_key
+from ryu.app import rest_nw_id
+from ryu.base import app_manager
+from ryu.controller import (conf_switch,
+                            handler,
+                            network,
+                            tunnels)
+from ryu.lib import dpid as dpid_lib
+from ryu.lib import synchronized
+from ryu.lib.ovs import bridge as ovs_bridge
+
+
+LOG = logging.getLogger(__name__)
+FLAGS = gflags.FLAGS
+gflags.DEFINE_string('tunnel_type', 'gre', 'tunnel type for ovs tunnel port')
+
+_TUNNEL_TYPE_TO_NW_ID = {
+    'gre': rest_nw_id.NW_ID_VPORT_GRE,
+}
+
+
+class NetworkAPI(object):
+    """Internal adopter class for RestAPI"""
+    def __init__(self, network_):
+        super(NetworkAPI, self).__init__()
+        self.nw = network_
+
+    def update_network(self, network_id):
+        self.nw.update_network(network_id)
+
+    def create_port(self, network_id, dpid, port_id):
+        self.nw.create_port(network_id, dpid, port_id)
+
+    def update_port(self, network_id, dpid, port_id):
+        self.nw.update_port(network_id, dpid, port_id)
+
+    def delete_port(self, network_id, dpid, port_id):
+        try:
+            self.nw.remove_port(network_id, dpid, port_id)
+        except (ryu_exc.NetworkNotFound, ryu_exc.PortNotFound):
+            pass
+
+
+class TunnelAPI(object):
+    """Internal adopter class for RestTunnelAPI"""
+    def __init__(self, tunnels_):
+        super(TunnelAPI, self).__init__()
+        self.tunnels = tunnels_
+
+    def update_remote_dpid(self, dpid, port_id, remote_dpid):
+        self.tunnels.update_port(dpid, port_id, remote_dpid)
+
+    def create_remote_dpid(self, dpid, port_id, remote_dpid):
+        self.tunnels.register_port(dpid, port_id, remote_dpid)
+
+    def delete_port(self, dpid, port_id):
+        try:
+            self.tunnels.delete_port(dpid, port_id)
+        except ryu_exc.PortNotFound:
+            pass
+
+
+class TunnelPort(object):
+    def __init__(self, dpid, port_no, local_ip, remote_ip, remote_dpid=None):
+        super(TunnelPort, self).__init__()
+        self.dpid = dpid
+        self.port_no = port_no
+        self.local_ip = local_ip
+        self.remote_ip = remote_ip
+        self.remote_dpid = remote_dpid
+
+    def __eq__(self, other):
+        return (self.dpid == other.dpid and
+                self.port_no == other.port_no and
+                self.local_ip == other.local_ip and
+                self.remote_ip == other.remote_ip and
+                self.remote_dpid == other.remote_dpid)
+
+
+class TunnelDP(object):
+    def __init__(self, dpid, ovsdb_addr, tunnel_ip, tunnel_type, conf_switch_,
+                 network_api, tunnel_api):
+        super(TunnelDP, self).__init__()
+        self.dpid = dpid
+        self.network_api = network_api
+        self.tunnel_api = tunnel_api
+
+        self.ovs_bridge = ovs_bridge.OVSBridge(dpid, ovsdb_addr)
+
+        self.tunnel_ip = tunnel_ip
+        self.tunnel_type = tunnel_type
+        self.tunnel_nw_id = _TUNNEL_TYPE_TO_NW_ID[tunnel_type]
+        self.tunnels = {}       # port number -> TunnelPort
+
+        self.conf_switch = conf_switch_
+        self.inited = False
+
+        self.req_q = gevent.queue.Queue()
+        self.thr = gevent.spawn_later(0, self._serve_loop)
+
+    def _init(self):
+        self.ovs_bridge.init()
+        for tp in self.ovs_bridge.get_tunnel_ports(self.tunnel_type):
+            if tp.local_ip != self.tunnel_ip:
+                LOG.warn('unknown tunnel port %s', tp)
+                continue
+
+            remote_dpid = self.conf_switch.find_dpid(cs_key.OVS_TUNNEL_ADDR,
+                                                     tp.remote_ip)
+            self.tunnels[tp.ofport] = TunnelPort(self.dpid, tp.ofport,
+                                                 self.tunnel_ip, tp.remote_ip,
+                                                 remote_dpid)
+            if remote_dpid:
+                self._api_update(tp.ofport, remote_dpid)
+
+        self.conf_switch = None
+        self.inited = True
+
+    def _api_update(self, port_no, remote_dpid):
+        self.network_api.update_port(self.tunnel_nw_id, self.dpid, port_no)
+        self.tunnel_api.update_remote_dpid(self.dpid, port_no, remote_dpid)
+
+    def _api_delete(self, port_no):
+        self.network_api.delete_port(self.tunnel_nw_id, self.dpid, port_no)
+        self.tunnel_api.delete_port(self.dpid, port_no)
+
+    def _update_remote(self, remote_dpid, remote_ip):
+        if self.dpid == remote_dpid:
+            if self.tunnel_ip == remote_ip:
+                return
+
+            # tunnel ip address is changed.
+            LOG.warn('local ip address is changed %s: %s -> %s',
+                     dpid_lib.dpid_to_str(remote_dpid),
+                     self.tunnel_ip, remote_ip)
+            # recreate tunnel ports.
+            for tp in list(self.tunnels.values()):
+                if tp.remote_dpid is None:
+                    # TODO:XXX
+                    continue
+
+                self._del_tunnel_port(tp.port_no, tp.local_ip, tp.remote_ip)
+
+                new_tp = self._add_tunnel_port(tp.remote_dpid, tp.remote_ip)
+                self._api_update(new_tp.ofport, tp.remote_dpid)
+            return
+
+        if self.tunnel_ip == remote_ip:
+            LOG.error('ip conflict: %s %s %s',
+                      dpid_lib.dpid_to_str(self.dpid),
+                      dpid_lib.dpid_to_str(remote_dpid), remote_ip)
+            # XXX What should we do?
+            return
+
+        for tp in list(self.tunnels.values()):
+            if tp.remote_dpid == remote_dpid:
+                if tp.remote_ip == remote_ip:
+                    self._api_update(tp.port_no, remote_dpid)
+                    continue
+
+                LOG.warn('remote ip address is changed %s: %s -> %s',
+                         dpid_lib.dpid_to_str(remote_dpid),
+                         tp.remote_ip, remote_ip)
+                self._del_tunnel_port(tp.port_no, self.tunnel_ip, tp.remote_ip)
+
+                new_tp = self._add_tunnel_port(remote_dpid, remote_ip)
+                self._api_update(new_tp.ofport, remote_dpid)
+            elif tp.remote_ip == remote_ip:
+                assert tp.remote_dpid is None
+                self._api_update(tp.port_no, remote_dpid)
+                tp.remote_dpid = remote_dpid
+
+    @staticmethod
+    def _to_hex(ip_addr):
+        # assuming IPv4 address
+        assert netaddr.IPAddress(ip_addr).ipv4()
+        return "%02x%02x%02x%02x" % netaddr.IPAddress(ip_addr).words
+
+    @staticmethod
+    def _port_name(local_ip, remote_ip):
+        # ovs requires requires less or equals to 14 bytes length
+        # gre<remote>-<local lsb>
+        _PORT_NAME_LENGTH = 14
+        local_hex = TunnelDP._to_hex(local_ip)
+        remote_hex = TunnelDP._to_hex(remote_ip)
+        return ("gre%s-%s" % (remote_hex, local_hex))[:_PORT_NAME_LENGTH]
+
+    def _tunnel_port_exists(self, remote_dpid, remote_ip):
+        return any(tp.remote_dpid == remote_dpid and tp.remote_ip == remote_ip
+                   for tp in self.tunnels.values())
+
+    def _add_tunnel_port(self, remote_dpid, remote_ip):
+        LOG.debug('add_tunnel_port local %s %s remote %s %s',
+                  dpid_lib.dpid_to_str(self.dpid), self.tunnel_ip,
+                  dpid_lib.dpid_to_str(remote_dpid), remote_ip)
+        if self._tunnel_port_exists(remote_dpid, remote_ip):
+            LOG.debug('add_tunnel_port nop')
+            return
+
+        LOG.debug('add_tunnel_port creating port')
+        port_name = self._port_name(self.tunnel_ip, remote_ip)
+        self.ovs_bridge.add_tunnel_port(port_name, self.tunnel_type,
+                                        self.tunnel_ip, remote_ip, 'flow')
+
+        tp = self.ovs_bridge.get_tunnel_port(port_name, self.tunnel_type)
+        self.tunnels[tp.ofport] = TunnelPort(self.dpid, tp.ofport,
+                                             tp.local_ip, tp.remote_ip,
+                                             remote_dpid)
+        self.network_api.create_port(self.tunnel_nw_id, self.dpid, tp.ofport)
+        self.tunnel_api.create_remote_dpid(self.dpid, tp.ofport, remote_dpid)
+        return tp
+
+    def _del_tunnel_port(self, port_no, local_ip, remote_ip):
+        port_name = self._port_name(local_ip, remote_ip)
+        self.ovs_bridge.del_port(port_name)
+        del self.tunnels[port_no]
+        self._api_delete(port_no)
+
+    def _del_tunnel_port_ip(self, remote_ip):
+        for tp in self.tunnels.values():
+            if tp.remote_ip == remote_ip:
+                self._del_tunnel_port(tp.port_no, self.tunnel_ip, remote_ip)
+                break
+
+    # serialize requests to this OVS DP
+    _RequestUpdateRemote = collections.namedtuple('_RequestUpdateRemote',
+                                                 ('remote_dpid', 'remote_ip'))
+    _RequestAddTunnelPort = collections.namedtuple('_RequestAddTunnelPort',
+                                                  ('remote_dpid', 'remote_ip'))
+    _RequestDelTunnelPort = collections.namedtuple('_RequestDelTunnelPort',
+                                                  ('remote_ip'))
+
+    class _RequestClose(object):
+        pass
+
+    def request_update_remote(self, remote_dpid, remote_ip):
+        self.req_q.put(self._RequestUpdateRemote(remote_dpid, remote_ip))
+
+    def request_add_tunnel_port(self, remote_dpid, remote_ip):
+        self.req_q.put(self._RequestAddTunnelPort(remote_dpid, remote_ip))
+
+    def request_del_tunnel_port(self, remote_ip):
+        self.req_q.put(self._RequestDelTunnelPort(remote_ip))
+
+    def close(self):
+        # self.thr.kill()
+        self.req_q.put(self._RequestClose())
+        self.thr.join()
+        self.thr = None
+
+    def _serve_loop(self):
+        # TODO:XXX backoff timeout
+        # TOOD:XXX and then, abandon and notify the caller(TunnelPortUpdater)
+
+        # TODO: if possible, squash requests?
+        #       For example, RequestAddTunnelPort and RequestDelTunnelPort
+        #       with same dpid are in the queue. AddTunnelPort request
+        #       can be skipped.
+        #       When ovsdb-server and vswitchd are over-loaded
+        #       (or connection to ovsdb are unstable), squashing request
+        #       would increase stability a bit?
+        #       But unsure how effective it would be.
+
+        if not self.inited:
+            try:
+                self._init()
+            except gevent.timeout.Timeout:
+                LOG.warn('_init timeouted')
+
+        req = None
+        while True:
+            if req is None:
+                req = self.req_q.get()
+                if isinstance(req, self._RequestClose):
+                    return
+
+            try:
+                if not self.inited:
+                    self._init()
+
+                # shoud use dispatcher?
+                if isinstance(req, self._RequestUpdateRemote):
+                    LOG.debug('update_remote')
+                    self._update_remote(req.remote_dpid, req.remote_ip)
+                elif isinstance(req, self._RequestAddTunnelPort):
+                    LOG.debug('add_tunnel_port')
+                    self._add_tunnel_port(req.remote_dpid, req.remote_ip)
+                elif isinstance(req, self._RequestDelTunnelPort):
+                    LOG.debug('del_tunnel_port')
+                    self._del_tunnel_port_ip(req.remote_ip)
+                else:
+                    LOG.error('unknown request %s', req)
+            except gevent.timeout.Timeout:
+                # timeout. try again
+                LOG.warn('timeout try again')
+                continue
+            else:
+                # Done. move onto next request
+                req = None
+
+
+class TunnelDPSet(dict):
+    """ dpid -> TunndlDP """
+    pass
+
+
+#import collections
+#class TunnelRequests(collections.defaultdict(set)):
+class TunnelRequests(dict):
+    def add(self, dpid0, dpid1):
+        self.setdefault(dpid0, set()).add(dpid1)
+        self.setdefault(dpid1, set()).add(dpid0)
+
+    def remove(self, dpid0, dpid1):
+        self[dpid0].remove(dpid1)
+        self[dpid1].remove(dpid0)
+
+    def get_remote(self, dpid):
+        return self.setdefault(dpid, set())
+
+
+class TunnelPortUpdater(app_manager.RyuApp):
+    _CONTEXTS = {
+        'conf_switch': conf_switch.ConfSwitchSet,
+        'network': network.Network,
+        'tunnels': tunnels.Tunnels,
+    }
+    _LOCK = 'lock'
+
+    def __init__(self, *args, **kwargs):
+        super(TunnelPortUpdater, self).__init__(args, kwargs)
+        self.tunnel_type = FLAGS.tunnel_type
+        self.cs = kwargs['conf_switch']
+        self.nw = kwargs['network']
+        self.tunnels = kwargs['tunnels']
+        self.tunnel_dpset = TunnelDPSet()
+        self.tunnel_requests = TunnelRequests()
+
+        self.network_api = NetworkAPI(self.nw)
+        self.tunnel_api = TunnelAPI(self.tunnels)
+        self.network_api.update_network(
+            _TUNNEL_TYPE_TO_NW_ID[self.tunnel_type])
+
+        setattr(self, self._LOCK, gevent.coros.Semaphore())
+
+    def _ovsdb_update(self, dpid, ovsdb_addr, ovs_tunnel_addr):
+        LOG.debug('_ovsdb_update %s %s %s',
+                  dpid_lib.dpid_to_str(dpid), ovsdb_addr, ovs_tunnel_addr)
+        if dpid not in self.tunnel_dpset:
+            # TODO:XXX changing ovsdb_addr, ovs_tunnel_addr
+            tunnel_dp = TunnelDP(dpid, ovsdb_addr, ovs_tunnel_addr,
+                                 self.tunnel_type, self.cs,
+                                 self.network_api, self.tunnel_api)
+            self.tunnel_dpset[dpid] = tunnel_dp
+
+        tunnel_dp = self.tunnel_dpset.get(dpid)
+        assert tunnel_dp
+        self._add_tunnel_ports(tunnel_dp,
+                               self.tunnel_requests.get_remote(dpid))
+
+    @handler.set_ev_cls(conf_switch.EventConfSwitchSet,
+                        conf_switch.CONF_SWITCH_EV_DISPATCHER)
+    @synchronized.synchronized(_LOCK)
+    def conf_switch_set_handler(self, ev):
+        LOG.debug('conf_switch_set_handler %s %s %s',
+                  dpid_lib.dpid_to_str(ev.dpid), ev.key, ev.value)
+        dpid = ev.dpid
+        if (ev.key == cs_key.OVSDB_ADDR or ev.key == cs_key.OVS_TUNNEL_ADDR):
+            if ((dpid, cs_key.OVSDB_ADDR) in self.cs and
+                    (dpid, cs_key.OVS_TUNNEL_ADDR) in self.cs):
+                self._ovsdb_update(
+                    dpid, self.cs.get_key(dpid, cs_key.OVSDB_ADDR),
+                    self.cs.get_key(dpid, cs_key.OVS_TUNNEL_ADDR))
+
+        if ev.key == cs_key.OVS_TUNNEL_ADDR:
+            for tunnel_dp in self.tunnel_dpset.values():
+                tunnel_dp.request_update_remote(ev.dpid, ev.value)
+
+    @handler.set_ev_cls(conf_switch.EventConfSwitchDel,
+                        conf_switch.CONF_SWITCH_EV_DISPATCHER)
+    @synchronized.synchronized(_LOCK)
+    def conf_switch_del_handler(self, ev):
+        # TODO:XXX
+        pass
+
+    def _add_tunnel_ports(self, tunnel_dp, remote_dpids):
+        LOG.debug('_add_tunnel_ports %s %s', tunnel_dp, remote_dpids)
+        for remote_dpid in remote_dpids:
+            remote_dp = self.tunnel_dpset.get(remote_dpid)
+            if remote_dp is None:
+                continue
+            tunnel_dp.request_add_tunnel_port(remote_dp.dpid,
+                                              remote_dp.tunnel_ip)
+            remote_dp.request_add_tunnel_port(tunnel_dp.dpid,
+                                              tunnel_dp.tunnel_ip)
+
+    def _vm_port_add(self, network_id, dpid):
+        LOG.debug('_vm_port_add %s %s', network_id, dpid_lib.dpid_to_str(dpid))
+        dpids = self.nw.get_dpids(network_id)
+        dpids.remove(dpid)
+        for remote_dpid in dpids:
+            self.tunnel_requests.add(dpid, remote_dpid)
+
+        tunnel_dp = self.tunnel_dpset.get(dpid)
+        if tunnel_dp is None:
+            return
+        self._add_tunnel_ports(tunnel_dp, dpids)
+
+    def _vm_port_del(self, network_id, dpid):
+        LOG.debug('_vm_port_del %s %s', network_id, dpid_lib.dpid_to_str(dpid))
+        if len(self.nw.get_ports(dpid, network_id)) > 1:
+            return
+
+        tunnel_networks = self.nw.get_networks(dpid).copy()
+        tunnel_networks.discard(network_id)
+        tunnel_networks.difference_update(rest_nw_id.RESERVED_NETWORK_IDS)
+        dpids = self.nw.get_dpids(network_id).copy()
+        dpids.discard(dpid)
+        del_dpids = []
+        for remote_dpid in dpids:
+            if tunnel_networks & self.nw.get_networks(remote_dpid):
+                continue
+            self.tunnel_requests.remove(dpid, remote_dpid)
+            del_dpids.append(remote_dpid)
+
+        tunnel_dp = self.tunnel_dpset.get(dpid)
+        if tunnel_dp is None:
+            return
+        for remote_dpid in del_dpids:
+            remote_dp = self.tunnel_dpset.get(remote_dpid)
+            if remote_dp is None:
+                continue
+            tunnel_dp.request_del_tunnel_port(remote_dp.tunnel_ip)
+            remote_dp.request_del_tunnel_port(tunnel_dp.tunnel_ip)
+
+    @handler.set_ev_cls(network.EventNetworkPort,
+                        network.NETWORK_TENANT_EV_DISPATCHER)
+    @synchronized.synchronized(_LOCK)
+    def network_port_handler(self, ev):
+        LOG.debug('network_port_handler %s', ev)
+        if ev.network_id in rest_nw_id.RESERVED_NETWORK_IDS:
+            return
+
+        if ev.add_del:
+            self._vm_port_add(ev.network_id, ev.dpid)
+        else:
+            self._vm_port_del(ev.network_id, ev.dpid)
-- 
1.7.10.4


------------------------------------------------------------------------------
Keep yourself connected to Go Parallel: 
TUNE You got it built. Now make it sing. Tune shows you how.
http://goparallel.sourceforge.net
_______________________________________________
Ryu-devel mailing list
Ryu-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to