From: Yoshihiro Kaneko <ykaneko0...@gmail.com>

Signed-off-by: Yoshihiro Kaneko <ykaneko0...@gmail.com>
Signed-off-by: Isaku Yamahata <yamah...@valinux.co.jp>
---
Changes v6 -> v7:
- check if port might not exist.
- make use of QueueSerializer and RequestQueue

Changes v5 -> v6:
- use OF port_mod event
- don't access to quantum db
- other clean ups
---
 ryu/app/quantum_adapter.py |  430 ++++++++++++++++++++++++++++++++++++++++++++
 ryu/flags.py               |   22 +++
 2 files changed, 452 insertions(+)
 create mode 100644 ryu/app/quantum_adapter.py

diff --git a/ryu/app/quantum_adapter.py b/ryu/app/quantum_adapter.py
new file mode 100644
index 0000000..a33a0f6
--- /dev/null
+++ b/ryu/app/quantum_adapter.py
@@ -0,0 +1,430 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import gflags
+import logging
+
+from quantumclient import client as q_client
+from quantumclient.common import exceptions as q_exc
+from quantumclient.v2_0 import client as q_clientv2
+
+from ryu.app import conf_switch_key as cs_key
+from ryu.app import rest_nw_id
+from ryu.base import app_manager
+from ryu.controller import (conf_switch,
+                            dispatcher,
+                            dpset,
+                            event,
+                            handler,
+                            handler_utils,
+                            network)
+from ryu.lib import dpid as dpid_lib
+from ryu.lib import mac as mac_lib
+from ryu.lib import quantum_ifaces
+from ryu.lib.ovs import bridge
+from ryu.lib.quantum_ifaces import QuantumIfaces
+
+
+from gevent import monkey
+monkey.patch_all()
+
+
+LOG = logging.getLogger(__name__)
+FLAGS = gflags.FLAGS
+
+
+def _get_auth_token():
+    httpclient = q_client.HTTPClient(
+        username=FLAGS.quantum_admin_username,
+        tenant_name=FLAGS.quantum_admin_tenant_name,
+        password=FLAGS.quantum_admin_password,
+        auth_url=FLAGS.quantum_admin_auth_url,
+        timeout=FLAGS.quantum_url_timeout,
+        auth_strategy=FLAGS.quantum_auth_strategy)
+    try:
+        httpclient.authenticate()
+    except (q_exc.Unauthorized, q_exc.Forbidden, q_exc.EndpointNotFound) as e:
+        LOG.error("authentication failure: %s", e)
+        return None
+    # LOG.debug("_get_auth_token: token=%s", httpclient.auth_token)
+    return httpclient.auth_token
+
+
+def _get_quantum_client(token):
+    if token:
+        my_client = q_clientv2.Client(
+            endpoint_url=FLAGS.quantum_url,
+            token=token, timeout=FLAGS.quantum_url_timeout)
+    else:
+        my_client = q_clientv2.Client(
+            endpoint_url=FLAGS.quantum_url,
+            auth_strategy=None, timeout=FLAGS.quantum_url_timeout)
+    return my_client
+
+
+class OVSPort(object):
+    PORT_ERROR = -1
+    PORT_UNKNOWN = 0
+    PORT_GATEWAY = 1
+    PORT_VETH_GATEWAY = 2
+    PORT_GUEST = 3
+    PORT_TUNNEL = 4
+
+    # extra-ids: 'attached-mac', 'iface-id', 'iface-status', 'vm-uuid'
+    def __init__(self, ofport):
+        super(OVSPort, self).__init__()
+        self.ofport = ofport
+        self.name = None
+        self.type = None
+        self.ext_ids = {}
+        self.options = {}
+
+    def update(self, port):
+        self.__dict__.update((key, port[key]) for key
+                             in ['name', 'ofport', 'type']
+                             if key in port)
+        if 'external_ids' in port:
+            self.ext_ids = dict(port['external_ids'])
+        if 'options' in port:
+            self.options = dict(port['options'])
+
+    def get_port_type(self):
+        if not isinstance(self.ofport, int):
+            return self.PORT_ERROR
+        if self.type == 'internal' and 'iface-id' in self.ext_ids:
+            return self.PORT_GATEWAY
+        if self.type == '' and 'iface-id' in self.ext_ids:
+            return self.PORT_VETH_GATEWAY
+        if (self.type == 'gre' and 'local_ip' in self.options and
+                'remote_ip' in self.options):
+            return self.PORT_TUNNEL
+        if self.type == '' and 'vm-uuid' in self.ext_ids:
+            return self.PORT_GUEST
+        return self.PORT_UNKNOWN
+
+    def __str__(self):
+        return "type=%s ofport=%s ext_ids=%s options=%s" % (
+            self.type, self.ofport, self.ext_ids, self.options)
+
+    def __eq__(self, other):
+        return (other is not None and
+                self.ofport == other.ofport and
+                self.type == other.type and
+                self.ext_ids == other.ext_ids and
+                self.options == other.options)
+
+
+class OVSSwitch(handler_utils.RequestQueue):
+    def __init__(self, dpid, nw, ifaces):
+        # TODO: clean up
+        token = None
+        if FLAGS.quantum_auth_strategy:
+            token = _get_auth_token()
+        q_api = _get_quantum_client(token)
+
+        self.dpid = dpid
+        self.network_api = nw
+        self.ifaces = ifaces
+        self.q_api = q_api
+        self.ctrl_addr = FLAGS.quantum_controller_addr
+
+        self.ovsdb_addr = None
+        self.tunnel_ip = None
+
+        self.ovs_bridge = None
+        self.ports = {}  # port_no -> OVSPort
+
+        super(OVSSwitch, self).__init__((OVSSwitch.set_ovsdb_addr,
+                                         OVSSwitch.update_port))
+
+    def set_ovsdb_addr(self, dpid, ovsdb_addr):
+        # easy check if the address format valid
+        LOG.debug('set_ovsdb_addr dpid %s ovsdb_addr %s',
+                  dpid_lib.dpid_to_str(dpid), ovsdb_addr)
+        _proto, _host, _port = ovsdb_addr.split(':')
+
+        old_address = self.ovsdb_addr
+        if old_address == ovsdb_addr:
+            return
+        if ovsdb_addr is None:
+            # TODO: clean up this ovs switch
+            if self.ovs_bridge:
+                self.ovs_bridge.del_controller()
+                self.ovs_bridge = None
+            return
+        self.ovsdb_addr = ovsdb_addr
+        if self.ovs_bridge is None:
+            LOG.debug('ovsdb: adding ports %s', self.ports)
+            ovs_bridge = bridge.OVSBridge(dpid, ovsdb_addr)
+            self.ovs_bridge = ovs_bridge
+            ovs_bridge.init()
+            # TODO: for multi-controller
+            #       not overwrite controllers, but append this controller
+            ovs_bridge.set_controller([self.ctrl_addr])
+            for port in self.ports.values():
+                LOG.debug('adding port %s', port)
+                self.update_port(port.ofport, port.name, True)
+
+    def _update_external_port(self, port, add=True):
+        if add:
+            self.network_api.update_port(rest_nw_id.NW_ID_EXTERNAL,
+                                         self.dpid, port.ofport)
+        else:
+            self.network_api.remove_port(rest_nw_id.NW_ID_EXTERNAL,
+                                         self.dpid, port.ofport)
+
+    def _update_vif_port(self, port, add=True):
+        LOG.debug("_update_vif_port: %s %s", port, add)
+        iface_id = port.ext_ids.get('iface-id')
+        if iface_id is None:
+            return
+
+        if not add:
+            port_data = {
+                'datapath_id': dpid_lib.dpid_to_str(self.dpid),
+                'port_no': port.ofport,
+
+                # In order to set
+                # port.status = quantum.common.constants.PORT_STATUS_DOWN
+                # port.status can't be changed via rest api directly,
+                # so resort to ryu-specical parameter to tell it.
+                'deleted': True
+            }
+            body = {'port': port_data}
+            # LOG.debug("port-body = %s", body)
+
+            try:
+                self.q_api.update_port(port.ext_ids['iface-id'], body)
+            except (q_exc.ConnectionFailed, q_exc.QuantumClientException) as e:
+                LOG.error("quantum update port failed: %s", e)
+                # TODO: When authentication failure occurred,
+                # it should get auth token again
+            return
+
+        # update {network, port, mac}
+        try:
+            network_id = self.ifaces.get_key(iface_id,
+                                             QuantumIfaces.KEY_NETWORK_ID)
+        except KeyError:
+            return
+        self.network_api.update_network(network_id)
+        self.network_api.update_port(network_id, self.dpid, port.ofport)
+        mac = port.ext_ids.get('attached-mac')
+        if mac:
+            self.network_api.update_mac(network_id, self.dpid, port.ofport,
+                                        mac_lib.haddr_to_bin(mac))
+
+    def update_port(self, port_no, port_name, add):
+        LOG.debug('update_port port_no %d %s', port_no, add)
+        old_port = self.ports.get(port_no)
+        if not add:
+            new_port = None
+            self.ports.pop(port_no, None)
+        else:
+            new_port = OVSPort(port_no)
+            if self.ovs_bridge:
+                port_cfg = self.ovs_bridge.get_quantum_ports(port_no,
+                                                             port_name)
+                if port_cfg:
+                    new_port.update(port_cfg)
+                    port_name = port_cfg['name']
+            self.ports[port_no] = new_port
+            iface_id = new_port.ext_ids.get('iface-id')
+            if iface_id:
+                self.ifaces.update_key(iface_id, QuantumIfaces.KEY_DATAPATH_ID,
+                                       self.dpid)
+                self.ifaces.update_key(iface_id, QuantumIfaces.KEY_OFPORT,
+                                       port_no)
+                self.ifaces.update_key(iface_id, QuantumIfaces.KEY_NAME,
+                                       port_name)
+
+        if old_port == new_port:
+            return
+
+        if not new_port:
+            port_type = old_port.get_port_type()
+            if port_type == OVSPort.PORT_ERROR:
+                return
+            elif port_type == OVSPort.PORT_UNKNOWN:
+                # LOG.info("delete external port: %s", old_port)
+                self._update_external_port(old_port, add=False)
+            else:
+                # LOG.info("delete port: %s", old_port)
+                if port_type != OVSPort.PORT_TUNNEL:
+                    self._update_vif_port(old_port, add=False)
+            return
+
+        if new_port.ofport == -1:
+            return
+        if not old_port or old_port.ofport == -1:
+            port_type = new_port.get_port_type()
+            if port_type == OVSPort.PORT_ERROR:
+                return
+            elif port_type == OVSPort.PORT_UNKNOWN:
+                # LOG.info("create external port: %s", new_port)
+                self._update_external_port(new_port)
+            else:
+                # LOG.info("create port: %s", new_port)
+                if port_type != OVSPort.PORT_TUNNEL:
+                    self._update_vif_port(new_port)
+            return
+        if new_port.get_port_type() in (OVSPort.PORT_GUEST,
+                                        OVSPort.PORT_GATEWAY,
+                                        OVSPort.PORT_VETH_GATEWAY):
+            # LOG.info("update port: %s", new_port)
+            self._update_vif_port(new_port)
+
+
+# To serialize events
+class QuantumAdapterQueue(handler_utils.QueueSerializer):
+    _QUEUE_NAME_QUANTUM_ADAPTER = '_quantum_adapter'
+    _DISPATCHER_NAME_QUANTUM_ADAPTER = '_quantum_adapter'
+    QUANTUM_ADAPTER_EV_DISPATCHER = dispatcher.EventDispatcher(
+        _DISPATCHER_NAME_QUANTUM_ADAPTER)
+
+    _EV_CLSES = (
+        (dpset.EventDP, dpset.DPSET_EV_DISPATCHER),
+        (dpset.EventPortAdd, dpset.DPSET_EV_DISPATCHER),
+        (dpset.EventPortDelete, dpset.DPSET_EV_DISPATCHER),
+        (conf_switch.EventConfSwitchSet,
+         conf_switch.CONF_SWITCH_EV_DISPATCHER),
+        (conf_switch.EventConfSwitchDel,
+         conf_switch.CONF_SWITCH_EV_DISPATCHER),
+        (quantum_ifaces.EventQuantumIfaceSet,
+         quantum_ifaces.QUANTUM_IFACE_EV_DISPATCHER),
+    )
+
+    def __init__(self):
+        super(QuantumAdapterQueue, self).__init__(
+            self._QUEUE_NAME_QUANTUM_ADAPTER,
+            self.QUANTUM_ADAPTER_EV_DISPATCHER, self._EV_CLSES)
+
+
+class QuantumAdapter(app_manager.RyuApp):
+    _CONTEXTS = {
+        'conf_switch': conf_switch.ConfSwitchSet,
+        'network': network.Network,
+        'quantum_ifaces': quantum_ifaces.QuantumIfaces,
+    }
+
+    def __init__(self, *_args, **kwargs):
+        super(QuantumAdapter, self).__init__()
+        self.adapter_queue = QuantumAdapterQueue()
+
+        self.cs = kwargs['conf_switch']
+        self.nw = kwargs['network']
+        self.ifaces = kwargs['quantum_ifaces']
+        self.dps = {}
+
+        for network_id in rest_nw_id.RESERVED_NETWORK_IDS:
+            if network_id == rest_nw_id.NW_ID_UNKNOWN:
+                continue
+            self.nw.update_network(network_id)
+
+    def _get_ovs_switch(self, dpid, create=True):
+        ovs_switch = self.dps.get(dpid)
+        if not ovs_switch:
+            if create:
+                ovs_switch = OVSSwitch(dpid, self.nw, self.ifaces)
+                self.dps[dpid] = ovs_switch
+        else:
+            LOG.debug('ovs switch %s is already known', dpid)
+        return ovs_switch
+
+    def _port_handler(self, dpid, port_no, port_name, add):
+        ovs_switch = self._get_ovs_switch(dpid)
+        if ovs_switch:
+            ovs_switch.request_update_port(port_no, port_name, add)
+        else:
+            LOG.warn('unknown ovs switch %s %s %s %s\n',
+                     dpid, port_no, port_name, add)
+
+    @handler.set_ev_cls(dpset.EventDP,
+                        QuantumAdapterQueue.QUANTUM_ADAPTER_EV_DISPATCHER)
+    def dp_handler(self, ev):
+        dpid = ev.dp.id
+        ovs_switch = self._get_ovs_switch(dpid)
+        if not ovs_switch:
+            return
+
+        if ev.enter_leave:
+            for port in ev.ports:
+                ovs_switch.request_update_port(port.port_no, port.name, True)
+        else:
+            # When dp leaving, we don't delete ports because OF connection
+            # can be disconnected for some reason.
+            # TODO: configuration needed to tell that this dp is really
+            # removed.
+            ovs_switch = self.dps.pop(dpid, None)
+            if ovs_switch:
+                ovs_switch.close()
+
+    @handler.set_ev_cls(dpset.EventPortAdd,
+                        QuantumAdapterQueue.QUANTUM_ADAPTER_EV_DISPATCHER)
+    def port_add_handler(self, ev):
+        port = ev.port
+        self._port_handler(ev.dp.id, port.port_no, port.name, True)
+
+    @handler.set_ev_cls(dpset.EventPortDelete,
+                        QuantumAdapterQueue.QUANTUM_ADAPTER_EV_DISPATCHER)
+    def port_del_handler(self, ev):
+        port = ev.port
+        self._port_handler(ev.dp.id, port.port_no, port.name, False)
+
+    def _conf_switch_set_ovsdb_addr(self, dpid, value):
+        ovs_switch = self._get_ovs_switch(dpid)
+        ovs_switch.request_set_ovsdb_addr(dpid, value)
+
+    def _conf_switch_del_ovsdb_addr(self, dpid):
+        ovs_switch = self._get_ovs_switch(dpid, False)
+        if ovs_switch:
+            ovs_switch.request_set_ovsdb_addr(dpid, None)
+
+    @handler.set_ev_cls(conf_switch.EventConfSwitchSet,
+                        QuantumAdapterQueue.QUANTUM_ADAPTER_EV_DISPATCHER)
+    def conf_switch_set_handler(self, ev):
+        LOG.debug("conf_switch set: %s", ev)
+        if ev.key == cs_key.OVSDB_ADDR:
+            self._conf_switch_set_ovsdb_addr(ev.dpid, ev.value)
+        else:
+            LOG.debug("unknown event: %s", ev)
+
+    @handler.set_ev_cls(conf_switch.EventConfSwitchDel,
+                        QuantumAdapterQueue.QUANTUM_ADAPTER_EV_DISPATCHER)
+    def conf_switch_del_handler(self, ev):
+        LOG.debug("conf_switch del: %s", ev)
+        if ev.key == cs_key.OVSDB_ADDR:
+            self._conf_switch_del_ovsdb_addr(ev.dpid)
+        else:
+            LOG.debug("unknown event: %s", ev)
+
+    @handler.set_ev_cls(quantum_ifaces.EventQuantumIfaceSet,
+                        QuantumAdapterQueue.QUANTUM_ADAPTER_EV_DISPATCHER)
+    def quantum_iface_set_handler(self, ev):
+        if ev.key != quantum_ifaces.QuantumIfaces.KEY_NETWORK_ID:
+            # LOG.debug("unknown key %s", ev.key)
+            return
+        iface_id = ev.iface_id
+        try:
+            dpid = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_DATAPATH_ID)
+            ofport = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_OFPORT)
+            port_name = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_NAME)
+        except KeyError:
+            return
+        ovs_switch = self._get_ovs_switch(dpid, False)
+        if not ovs_switch:
+            return
+        ovs_switch.request_update_port(ofport, port_name, True)
diff --git a/ryu/flags.py b/ryu/flags.py
index e969620..610b201 100644
--- a/ryu/flags.py
+++ b/ryu/flags.py
@@ -23,3 +23,25 @@ FLAGS = gflags.FLAGS
 
 # GLOBAL flags
 gflags.DEFINE_boolean('monkey_patch', False, 'do monkey patch')
+
+# app/quantum_adapter
+gflags.DEFINE_string('quantum_url', 'http://localhost:9696',
+                     'URL for connecting to quantum')
+gflags.DEFINE_integer('quantum_url_timeout', 30,
+                      'timeout value for connecting to quantum in seconds')
+gflags.DEFINE_string('quantum_admin_username', 'quantum',
+                     'username for connecting to quantum in admin context')
+gflags.DEFINE_string('quantum_admin_password', 'service_password',
+                     'password for connecting to quantum in admin context')
+gflags.DEFINE_string('quantum_admin_tenant_name', 'service',
+                     'tenant name for connecting to quantum in admin context')
+gflags.DEFINE_string('quantum_admin_auth_url', 'http://localhost:5000/v2.0',
+                     'auth url for connecting to quantum in admin context')
+gflags.DEFINE_string(
+    'quantum_auth_strategy',
+    'keystone',
+    'auth strategy for connecting to quantum in admin context')
+
+gflags.DEFINE_string('quantum_controller_addr', None,
+                     'openflow mehod:address:port to set controller of'
+                     'ovs bridge')
-- 
1.7.10.4


------------------------------------------------------------------------------
Keep yourself connected to Go Parallel: 
TUNE You got it built. Now make it sing. Tune shows you how.
http://goparallel.sourceforge.net
_______________________________________________
Ryu-devel mailing list
Ryu-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to