This application dynamically creates/deletes tunnel ports for GRE tunnel app.

Signed-off-by: Isaku Yamahata <[email protected]>
---
Changes v3 -> v4:
- 'key=flow' option to tunnel port
---
 ryu/app/tunnel_port_updater.py |  470 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 470 insertions(+)
 create mode 100644 ryu/app/tunnel_port_updater.py

diff --git a/ryu/app/tunnel_port_updater.py b/ryu/app/tunnel_port_updater.py
new file mode 100644
index 0000000..4fdd8ce
--- /dev/null
+++ b/ryu/app/tunnel_port_updater.py
@@ -0,0 +1,470 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import gevent
+import gflags
+import logging
+import netaddr
+
+from ryu import exception as ryu_exc
+from ryu.app import conf_switch_key as cs_key
+from ryu.app import rest_nw_id
+from ryu.base import app_manager
+from ryu.controller import (conf_switch,
+                            handler,
+                            network,
+                            tunnels)
+from ryu.lib import dpid as dpid_lib
+from ryu.lib import synchronized
+from ryu.lib.ovs import bridge as ovs_bridge
+
+
+LOG = logging.getLogger(__name__)
+FLAGS = gflags.FLAGS
+gflags.DEFINE_string('tunnel_type', 'gre', 'tunnel type for ovs tunnel port')
+
+_TUNNEL_TYPE_TO_NW_ID = {
+    'gre': rest_nw_id.NW_ID_VPORT_GRE,
+}
+
+
+class NetworkAPI(object):
+    """Internal adopter class for RestAPI"""
+    def __init__(self, network_):
+        super(NetworkAPI, self).__init__()
+        self.nw = network_
+
+    def update_network(self, network_id):
+        self.nw.update_network(network_id)
+
+    def create_port(self, network_id, dpid, port_id):
+        self.nw.create_port(network_id, dpid, port_id)
+
+    def update_port(self, network_id, dpid, port_id):
+        self.nw.update_port(network_id, dpid, port_id)
+
+    def delete_port(self, network_id, dpid, port_id):
+        try:
+            self.nw.remove_port(network_id, dpid, port_id)
+        except (ryu_exc.NetworkNotFound, ryu_exc.PortNotFound):
+            pass
+
+
+class TunnelAPI(object):
+    """Internal adopter class for RestTunnelAPI"""
+    def __init__(self, tunnels_):
+        super(TunnelAPI, self).__init__()
+        self.tunnels = tunnels_
+
+    def update_remote_dpid(self, dpid, port_id, remote_dpid):
+        self.tunnels.update_port(dpid, port_id, remote_dpid)
+
+    def create_remote_dpid(self, dpid, port_id, remote_dpid):
+        self.tunnels.register_port(dpid, port_id, remote_dpid)
+
+    def delete_port(self, dpid, port_id):
+        try:
+            self.tunnels.delete_port(dpid, port_id)
+        except ryu_exc.PortNotFound:
+            pass
+
+
+class TunnelPort(object):
+    def __init__(self, dpid, port_no, local_ip, remote_ip, remote_dpid=None):
+        super(TunnelPort, self).__init__()
+        self.dpid = dpid
+        self.port_no = port_no
+        self.local_ip = local_ip
+        self.remote_ip = remote_ip
+        self.remote_dpid = remote_dpid
+
+    def __eq__(self, other):
+        return (self.dpid == other.dpid and
+                self.port_no == other.port_no and
+                self.local_ip == other.local_ip and
+                self.remote_ip == other.remote_ip and
+                self.remote_dpid == other.remote_dpid)
+
+
+class TunnelDP(object):
+    def __init__(self, dpid, ovsdb_addr, tunnel_ip, tunnel_type, conf_switch_,
+                 network_api, tunnel_api):
+        super(TunnelDP, self).__init__()
+        self.dpid = dpid
+        self.network_api = network_api
+        self.tunnel_api = tunnel_api
+
+        self.ovs_bridge = ovs_bridge.OVSBridge(dpid, ovsdb_addr)
+
+        self.tunnel_ip = tunnel_ip
+        self.tunnel_type = tunnel_type
+        self.tunnel_nw_id = _TUNNEL_TYPE_TO_NW_ID[tunnel_type]
+        self.tunnels = {}       # port number -> TunnelPort
+
+        self.conf_switch = conf_switch_
+        self.inited = False
+
+        self.req_q = gevent.queue.Queue()
+        self.thr = gevent.spawn_later(0, self._serve_loop)
+
+    def _init(self):
+        self.ovs_bridge.init()
+        for tp in self.ovs_bridge.get_tunnel_ports(self.tunnel_type):
+            if tp.local_ip != self.tunnel_ip:
+                LOG.warn('unknown tunnel port %s', tp)
+                continue
+
+            remote_dpid = self.conf_switch.find_dpid(cs_key.OVS_TUNNEL_ADDR,
+                                                     tp.remote_ip)
+            self.tunnels[tp.ofport] = TunnelPort(self.dpid, tp.ofport,
+                                                 self.tunnel_ip, tp.remote_ip,
+                                                 remote_dpid)
+            if remote_dpid:
+                self._api_update(tp.ofport, remote_dpid)
+
+        self.conf_switch = None
+        self.inited = True
+
+    def _api_update(self, port_no, remote_dpid):
+        self.network_api.update_port(self.tunnel_nw_id, self.dpid, port_no)
+        self.tunnel_api.update_remote_dpid(self.dpid, port_no, remote_dpid)
+
+    def _api_delete(self, port_no):
+        self.network_api.delete_port(self.tunnel_nw_id, self.dpid, port_no)
+        self.tunnel_api.delete_port(self.dpid, port_no)
+
+    def _update_remote(self, remote_dpid, remote_ip):
+        if self.dpid == remote_dpid:
+            if self.tunnel_ip == remote_ip:
+                return
+
+            # tunnel ip address is changed.
+            LOG.warn('local ip address is changed %s: %s -> %s',
+                     dpid_lib.dpid_to_str(remote_dpid),
+                     self.tunnel_ip, remote_ip)
+            # recreate tunnel ports.
+            for tp in list(self.tunnels.values()):
+                if tp.remote_dpid is None:
+                    # TODO:XXX
+                    continue
+
+                self._del_tunnel_port(tp.port_no, tp.local_ip, tp.remote_ip)
+
+                new_tp = self._add_tunnel_port(tp.remote_dpid, tp.remote_ip)
+                self._api_update(new_tp.ofport, tp.remote_dpid)
+            return
+
+        if self.tunnel_ip == remote_ip:
+            LOG.error('ip conflict: %s %s %s',
+                      dpid_lib.dpid_to_str(self.dpid),
+                      dpid_lib.dpid_to_str(remote_dpid), remote_ip)
+            # XXX What should we do?
+            return
+
+        for tp in list(self.tunnels.values()):
+            if tp.remote_dpid == remote_dpid:
+                if tp.remote_ip == remote_ip:
+                    self._api_update(tp.port_no, remote_dpid)
+                    continue
+
+                LOG.warn('remote ip address is changed %s: %s -> %s',
+                         dpid_lib.dpid_to_str(remote_dpid),
+                         tp.remote_ip, remote_ip)
+                self._del_tunnel_port(tp.port_no, self.tunnel_ip, tp.remote_ip)
+
+                new_tp = self._add_tunnel_port(remote_dpid, remote_ip)
+                self._api_update(new_tp.ofport, remote_dpid)
+            elif tp.remote_ip == remote_ip:
+                assert tp.remote_dpid is None
+                self._api_update(tp.port_no, remote_dpid)
+                tp.remote_dpid = remote_dpid
+
+    @staticmethod
+    def _to_hex(ip_addr):
+        # assuming IPv4 address
+        assert netaddr.IPAddress(ip_addr).ipv4()
+        return "%02x%02x%02x%02x" % netaddr.IPAddress(ip_addr).words
+
+    @staticmethod
+    def _port_name(local_ip, remote_ip):
+        # ovs requires requires less or equals to 14 bytes length
+        # gre<remote>-<local lsb>
+        _PORT_NAME_LENGTH = 14
+        local_hex = TunnelDP._to_hex(local_ip)
+        remote_hex = TunnelDP._to_hex(remote_ip)
+        return ("gre%s-%s" % (remote_hex, local_hex))[:_PORT_NAME_LENGTH]
+
+    def _tunnel_port_exists(self, remote_dpid, remote_ip):
+        return any(tp.remote_dpid == remote_dpid and tp.remote_ip == remote_ip
+                   for tp in self.tunnels.values())
+
+    def _add_tunnel_port(self, remote_dpid, remote_ip):
+        LOG.debug('add_tunnel_port local %s %s remote %s %s',
+                  dpid_lib.dpid_to_str(self.dpid), self.tunnel_ip,
+                  dpid_lib.dpid_to_str(remote_dpid), remote_ip)
+        if self._tunnel_port_exists(remote_dpid, remote_ip):
+            LOG.debug('add_tunnel_port nop')
+            return
+
+        LOG.debug('add_tunnel_port creating port')
+        port_name = self._port_name(self.tunnel_ip, remote_ip)
+        self.ovs_bridge.add_tunnel_port(port_name, self.tunnel_type,
+                                        self.tunnel_ip, remote_ip, 'flow')
+
+        tp = self.ovs_bridge.get_tunnel_port(port_name, self.tunnel_type)
+        self.tunnels[tp.ofport] = TunnelPort(self.dpid, tp.ofport,
+                                             tp.local_ip, tp.remote_ip,
+                                             remote_dpid)
+        self.network_api.create_port(self.tunnel_nw_id, self.dpid, tp.ofport)
+        self.tunnel_api.create_remote_dpid(self.dpid, tp.ofport, remote_dpid)
+        return tp
+
+    def _del_tunnel_port(self, port_no, local_ip, remote_ip):
+        port_name = self._port_name(local_ip, remote_ip)
+        self.ovs_bridge.del_port(port_name)
+        del self.tunnels[port_no]
+        self._api_delete(port_no)
+
+    def _del_tunnel_port_ip(self, remote_ip):
+        for tp in self.tunnels.values():
+            if tp.remote_ip == remote_ip:
+                self._del_tunnel_port(tp.port_no, self.tunnel_ip, remote_ip)
+                break
+
+    # serialize requests to this OVS DP
+    _RequestUpdateRemote = collections.namedtuple('_RequestUpdateRemote',
+                                                 ('remote_dpid', 'remote_ip'))
+    _RequestAddTunnelPort = collections.namedtuple('_RequestAddTunnelPort',
+                                                  ('remote_dpid', 'remote_ip'))
+    _RequestDelTunnelPort = collections.namedtuple('_RequestDelTunnelPort',
+                                                  ('remote_ip'))
+
+    class _RequestClose(object):
+        pass
+
+    def request_update_remote(self, remote_dpid, remote_ip):
+        self.req_q.put(self._RequestUpdateRemote(remote_dpid, remote_ip))
+
+    def request_add_tunnel_port(self, remote_dpid, remote_ip):
+        self.req_q.put(self._RequestAddTunnelPort(remote_dpid, remote_ip))
+
+    def request_del_tunnel_port(self, remote_ip):
+        self.req_q.put(self._RequestDelTunnelPort(remote_ip))
+
+    def close(self):
+        # self.thr.kill()
+        self.req_q.put(self._RequestClose())
+        self.thr.join()
+        self.thr = None
+
+    def _serve_loop(self):
+        # TODO:XXX backoff timeout
+        # TOOD:XXX and then, abandon and notify the caller(TunnelPortUpdater)
+
+        # TODO: if possible, squash requests?
+        #       For example, RequestAddTunnelPort and RequestDelTunnelPort
+        #       with same dpid are in the queue. AddTunnelPort request
+        #       can be skipped.
+        #       When ovsdb-server and vswitchd are over-loaded
+        #       (or connection to ovsdb are unstable), squashing request
+        #       would increase stability a bit?
+        #       But unsure how effective it would be.
+
+        if not self.inited:
+            try:
+                self._init()
+            except gevent.timeout.Timeout:
+                LOG.warn('_init timeouted')
+
+        req = None
+        while True:
+            if req is None:
+                req = self.req_q.get()
+                if isinstance(req, self._RequestClose):
+                    return
+
+            try:
+                if not self.inited:
+                    self._init()
+
+                # shoud use dispatcher?
+                if isinstance(req, self._RequestUpdateRemote):
+                    LOG.debug('update_remote')
+                    self._update_remote(req.remote_dpid, req.remote_ip)
+                elif isinstance(req, self._RequestAddTunnelPort):
+                    LOG.debug('add_tunnel_port')
+                    self._add_tunnel_port(req.remote_dpid, req.remote_ip)
+                elif isinstance(req, self._RequestDelTunnelPort):
+                    LOG.debug('del_tunnel_port')
+                    self._del_tunnel_port_ip(req.remote_ip)
+                else:
+                    LOG.error('unknown request %s', req)
+            except gevent.timeout.Timeout:
+                # timeout. try again
+                LOG.warn('timeout try again')
+                continue
+            else:
+                # Done. move onto next request
+                req = None
+
+
+class TunnelDPSet(dict):
+    """ dpid -> TunndlDP """
+    pass
+
+
+#import collections
+#class TunnelRequests(collections.defaultdict(set)):
+class TunnelRequests(dict):
+    def add(self, dpid0, dpid1):
+        self.setdefault(dpid0, set()).add(dpid1)
+        self.setdefault(dpid1, set()).add(dpid0)
+
+    def remove(self, dpid0, dpid1):
+        self[dpid0].remove(dpid1)
+        self[dpid1].remove(dpid0)
+
+    def get_remote(self, dpid):
+        return self.setdefault(dpid, set())
+
+
+class TunnelPortUpdater(app_manager.RyuApp):
+    _CONTEXTS = {
+        'conf_switch': conf_switch.ConfSwitchSet,
+        'network': network.Network,
+        'tunnels': tunnels.Tunnels,
+    }
+    _LOCK = 'lock'
+
+    def __init__(self, *args, **kwargs):
+        super(TunnelPortUpdater, self).__init__(args, kwargs)
+        self.tunnel_type = FLAGS.tunnel_type
+        self.cs = kwargs['conf_switch']
+        self.nw = kwargs['network']
+        self.tunnels = kwargs['tunnels']
+        self.tunnel_dpset = TunnelDPSet()
+        self.tunnel_requests = TunnelRequests()
+
+        self.network_api = NetworkAPI(self.nw)
+        self.tunnel_api = TunnelAPI(self.tunnels)
+        self.network_api.update_network(
+            _TUNNEL_TYPE_TO_NW_ID[self.tunnel_type])
+
+        setattr(self, self._LOCK, gevent.coros.Semaphore())
+
+    def _ovsdb_update(self, dpid, ovsdb_addr, ovs_tunnel_addr):
+        LOG.debug('_ovsdb_update %s %s %s',
+                  dpid_lib.dpid_to_str(dpid), ovsdb_addr, ovs_tunnel_addr)
+        if dpid not in self.tunnel_dpset:
+            # TODO:XXX changing ovsdb_addr, ovs_tunnel_addr
+            tunnel_dp = TunnelDP(dpid, ovsdb_addr, ovs_tunnel_addr,
+                                 self.tunnel_type, self.cs,
+                                 self.network_api, self.tunnel_api)
+            self.tunnel_dpset[dpid] = tunnel_dp
+
+        tunnel_dp = self.tunnel_dpset.get(dpid)
+        assert tunnel_dp
+        self._add_tunnel_ports(tunnel_dp,
+                               self.tunnel_requests.get_remote(dpid))
+
+    @handler.set_ev_cls(conf_switch.EventConfSwitchSet,
+                        conf_switch.CONF_SWITCH_EV_DISPATCHER)
+    @synchronized.synchronized(_LOCK)
+    def conf_switch_set_handler(self, ev):
+        LOG.debug('conf_switch_set_handler %s %s %s',
+                  dpid_lib.dpid_to_str(ev.dpid), ev.key, ev.value)
+        dpid = ev.dpid
+        if (ev.key == cs_key.OVSDB_ADDR or ev.key == cs_key.OVS_TUNNEL_ADDR):
+            if ((dpid, cs_key.OVSDB_ADDR) in self.cs and
+                    (dpid, cs_key.OVS_TUNNEL_ADDR) in self.cs):
+                self._ovsdb_update(
+                    dpid, self.cs.get_key(dpid, cs_key.OVSDB_ADDR),
+                    self.cs.get_key(dpid, cs_key.OVS_TUNNEL_ADDR))
+
+        if ev.key == cs_key.OVS_TUNNEL_ADDR:
+            for tunnel_dp in self.tunnel_dpset.values():
+                tunnel_dp.request_update_remote(ev.dpid, ev.value)
+
+    @handler.set_ev_cls(conf_switch.EventConfSwitchDel,
+                        conf_switch.CONF_SWITCH_EV_DISPATCHER)
+    @synchronized.synchronized(_LOCK)
+    def conf_switch_del_handler(self, ev):
+        # TODO:XXX
+        pass
+
+    def _add_tunnel_ports(self, tunnel_dp, remote_dpids):
+        LOG.debug('_add_tunnel_ports %s %s', tunnel_dp, remote_dpids)
+        for remote_dpid in remote_dpids:
+            remote_dp = self.tunnel_dpset.get(remote_dpid)
+            if remote_dp is None:
+                continue
+            tunnel_dp.request_add_tunnel_port(remote_dp.dpid,
+                                              remote_dp.tunnel_ip)
+            remote_dp.request_add_tunnel_port(tunnel_dp.dpid,
+                                              tunnel_dp.tunnel_ip)
+
+    def _vm_port_add(self, network_id, dpid):
+        LOG.debug('_vm_port_add %s %s', network_id, dpid_lib.dpid_to_str(dpid))
+        dpids = self.nw.get_dpids(network_id)
+        dpids.remove(dpid)
+        for remote_dpid in dpids:
+            self.tunnel_requests.add(dpid, remote_dpid)
+
+        tunnel_dp = self.tunnel_dpset.get(dpid)
+        if tunnel_dp is None:
+            return
+        self._add_tunnel_ports(tunnel_dp, dpids)
+
+    def _vm_port_del(self, network_id, dpid):
+        LOG.debug('_vm_port_del %s %s', network_id, dpid_lib.dpid_to_str(dpid))
+        if len(self.nw.get_ports(dpid, network_id)) > 1:
+            return
+
+        tunnel_networks = self.nw.get_networks(dpid).copy()
+        tunnel_networks.discard(network_id)
+        tunnel_networks.difference_update(rest_nw_id.RESERVED_NETWORK_IDS)
+        dpids = self.nw.get_dpids(network_id).copy()
+        dpids.discard(dpid)
+        del_dpids = []
+        for remote_dpid in dpids:
+            if tunnel_networks & self.nw.get_networks(remote_dpid):
+                continue
+            self.tunnel_requests.remove(dpid, remote_dpid)
+            del_dpids.append(remote_dpid)
+
+        tunnel_dp = self.tunnel_dpset.get(dpid)
+        if tunnel_dp is None:
+            return
+        for remote_dpid in del_dpids:
+            remote_dp = self.tunnel_dpset.get(remote_dpid)
+            if remote_dp is None:
+                continue
+            tunnel_dp.request_del_tunnel_port(remote_dp.tunnel_ip)
+            remote_dp.request_del_tunnel_port(tunnel_dp.tunnel_ip)
+
+    @handler.set_ev_cls(network.EventNetworkPort,
+                        network.NETWORK_TENANT_EV_DISPATCHER)
+    @synchronized.synchronized(_LOCK)
+    def network_port_handler(self, ev):
+        LOG.debug('network_port_handler %s', ev)
+        if ev.network_id in rest_nw_id.RESERVED_NETWORK_IDS:
+            return
+
+        if ev.add_del:
+            self._vm_port_add(ev.network_id, ev.dpid)
+        else:
+            self._vm_port_del(ev.network_id, ev.dpid)
-- 
1.7.10.4


------------------------------------------------------------------------------
Don't let slow site performance ruin your business. Deploy New Relic APM
Deploy New Relic app performance management and know exactly
what is happening inside your Ruby, Python, PHP, Java, and .NET app
Try New Relic at no cost today and get our sweet Data Nerd shirt too!
http://p.sf.net/sfu/newrelic-dev2dev
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to