From: Yoshihiro Kaneko <[email protected]> Signed-off-by: Yoshihiro Kaneko <[email protected]> Signed-off-by: Isaku Yamahata <[email protected]> --- ryu/app/quantum_adapter.py | 552 ++++++++++++++++++++++++++++++++++++++++++++ ryu/flags.py | 28 +++ 2 files changed, 580 insertions(+) create mode 100644 ryu/app/quantum_adapter.py
diff --git a/ryu/app/quantum_adapter.py b/ryu/app/quantum_adapter.py new file mode 100644 index 0000000..57750ab --- /dev/null +++ b/ryu/app/quantum_adapter.py @@ -0,0 +1,552 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gflags +import logging +import socket +import ssl +import uuid + +from gevent import monkey +import gevent +monkey.patch_all() + +from sqlalchemy.exc import NoSuchTableError, OperationalError +from sqlalchemy.ext.sqlsoup import SqlSoup +from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import scoped_session +from sqlalchemy.orm.exc import NoResultFound + +from ovs import json +from ovs.jsonrpc import Message + +from quantumclient import client as q_client +from quantumclient.common import exceptions as q_exc +from quantumclient.v2_0 import client as q_clientv2 + +from ryu.app import conf_switch_key as cs_key +from ryu.app import rest_nw_id +from ryu.base import app_manager +from ryu.controller import conf_switch, handler, network +from ryu.lib import dpid as dpid_lib +from ryu.lib import synchronized + + +LOG = logging.getLogger('quantum_adapter') +FLAGS = gflags.FLAGS + +def _get_auth_token(): + httpclient = q_client.HTTPClient( + username=FLAGS.quantum_admin_username, + tenant_name=FLAGS.quantum_admin_tenant_name, + password=FLAGS.quantum_admin_password, + auth_url=FLAGS.quantum_admin_auth_url, + timeout=FLAGS.quantum_url_timeout, + auth_strategy=FLAGS.quantum_auth_strategy) + try: + httpclient.authenticate() + except (q_exc.Unauthorized, q_exc.Forbidden, q_exc.EndpointNotFound) as e: + LOG.error("authentication failure: %s", e) + return None + # LOG.debug("_get_auth_token: token=%s", httpclient.auth_token) + return httpclient.auth_token + + +def _get_quantum_client(token): + if token: + my_client = q_clientv2.Client( + endpoint_url=FLAGS.quantum_url, + token=token, timeout=FLAGS.quantum_url_timeout) + else: + my_client = q_clientv2.Client( + endpoint_url=FLAGS.quantum_url, + auth_strategy=None, timeout=FLAGS.quantum_url_timeout) + return my_client + + +PORT_ERROR = -1 +PORT_UNKNOWN = 0 +PORT_GATEWAY = 1 +PORT_VETH_GATEWAY = 2 +PORT_GUEST = 3 +PORT_TUNNEL = 4 + + +class OVSPort(object): + # extra-ids: 'attached-mac', 'iface-id', 'iface-status', 'vm-uuid' + + def __init__(self, row, port): + super(OVSPort, self).__init__() + self.row = row + self.name = None + self.ofport = None + self.type = None + self.ext_ids = {} + self.options = {} + self.update(port) + + def update(self, port): + self.__dict__.update((key, port[key]) for key + in ['name', 'ofport', 'type'] + if key in port) + if 'external_ids' in port: + self.ext_ids = dict(port['external_ids'][1]) + if 'options' in port: + self.options = dict(port['options'][1]) + + def get_port_type(self): + if not isinstance(self.ofport, int): + return PORT_ERROR + if self.type == 'internal' and 'iface-id' in self.ext_ids: + return PORT_GATEWAY + if self.type == '' and 'iface-id' in self.ext_ids: + return PORT_VETH_GATEWAY + if (self.type == 'gre' and 'local_ip' in self.options and + 'remote_ip' in self.options): + return PORT_TUNNEL + if self.type == '' and 'vm-uuid' in self.ext_ids: + return PORT_GUEST + return PORT_UNKNOWN + + def __str__(self): + return "name=%s type=%s ofport=%s ext_ids=%s options=%s" % ( + self.name, self.type, self.ofport, self.ext_ids, self.options) + + +S_DPID_GET = 0 # start datapath-id monitoring +S_CTRL_SET = 1 # start set controller +S_PORT_GET = 2 # start port monitoring +S_MONITOR = 3 # datapath-id/port monitoring + + +class OVSMonitor(object): + def __init__(self, dpid, nw, db, q_api, ctrl_addr): + super(OVSMonitor, self).__init__() + self.dpid = dpid + self.network_api = nw + self.db = db + self.q_api = q_api + self.ctrl_addr = ctrl_addr + + self.address = None + self.tunnel_ip = None + self.int_bridge = None + self.socket = None + self.state = None + self.parser = None + self.dpid_row = None + self.is_active = False + + self.handlers = {} + self.handlers[S_DPID_GET] = {Message.T_REPLY: self.receive_dpid} + self.handlers[S_CTRL_SET] = {Message.T_REPLY: + self.receive_set_controller} + self.handlers[S_PORT_GET] = {Message.T_REPLY: self.receive_port} + self.handlers[S_MONITOR] = {Message.T_NOTIFY: { + 'port_monitor': self.monitor_port + }} + + def update_external_port(self, port, delete=False): + # TODO:XXX + # check if the given port is in self.dpid + # Once this is done, ryu_quantum_agent.VifPortSet can be eliminated + return + + if delete: + self.network_api.delete_port(rest_nw_id.NW_ID_EXTERNAL, + self.dpid, port.ofport) + else: + self.network_api.update_port(rest_nw_id.NW_ID_EXTERNAL, + self.dpid, port.ofport) + + def update_vif_port(self, port, delete=False): + # LOG.debug("update_vif_port: %s", port) + try: + port_info = self.db.ports.filter( + self.db.ports.id == port.ext_ids['iface-id']).one() + except NoResultFound: + LOG.warn("port not found: %s", port.ext_ids['iface-id']) + self.db.commit() + return + except (NoSuchTableError, OperationalError): + LOG.error("could not access database") + self.db.rollback() + # TODO: If OperationalError occurred, it should re-connect to + # the database (re-create SplSoup object) + return + self.db.commit() + + # TODO:XXX + # this port can be in other bridge, not self.dpid. + # If so, ignore it. + # For now, this is an easy workaround. + if port_info.device_owner == 'network:router_gateway': + return + + port_data = { + # TODO:XXX check if this port is in dpid + 'datapath_id': dpid_lib.dpid_to_str(self.dpid), + + 'port_no': port.ofport, + } + if delete: + # In order to set + # port.status = quantum.common.constants.PORT_STATUS_DOWN + # port.status can't be changed via rest api directly, so resort to + # ryu-specical parameter to tell it. + port_data['deleted'] = True + body = {'port': port_data} + # LOG.debug("port-body = %s", body) + try: + self.q_api.update_port(port_info.id, body) + except (q_exc.ConnectionFailed, q_exc.QuantumClientException) as e: + LOG.error("quantum update port failed: %s", e) + # TODO: When authentication failure occurred, it should get auth + # token again + + def update_port(self, data): + for row in data: + table = data[row] + new_port = None + old_port = None + if "new" in table: + new_port = OVSPort(row, table['new']) + if "old" in table: + old_port = OVSPort(row, table['old']) + + if old_port == new_port: + continue + if not new_port: + port_type = old_port.get_port_type() + if port_type == PORT_ERROR: + continue + elif port_type == PORT_UNKNOWN: + # LOG.info("delete external port: %s", old_port) + self.update_external_port(old_port, delete=True) + else: + # LOG.info("delete port: %s", old_port) + if port_type != PORT_TUNNEL: + self.update_vif_port(old_port, delete=True) + continue + if new_port.ofport == -1: + continue + if not old_port or old_port.ofport == -1: + port_type = new_port.get_port_type() + if port_type == PORT_ERROR: + continue + elif port_type == PORT_UNKNOWN: + # LOG.info("create external port: %s", new_port) + self.update_external_port(new_port) + else: + # LOG.info("create port: %s", new_port) + if port_type != PORT_TUNNEL: + self.update_vif_port(new_port) + continue + if new_port.get_port_type() in (PORT_GUEST, + PORT_GATEWAY, PORT_VETH_GATEWAY): + # LOG.info("update port: %s", new_port) + self.update_vif_port(new_port) + + def update_dpid(self, data): + for row in data: + table = data[row] + if "new" in table: + table_new = table['new'] + int_bridge = table_new['name'] + dpid = dpid_lib.str_to_dpid(table_new['datapath_id']) + if dpid == self.dpid: + # LOG.debug("datapath_id=%s name=%s", dpid, int_bridge) + self.dpid_row = row + self.int_bridge = int_bridge + self.start_set_controller(table_new) + break + + def monitor_port(self, msg): + _key, args = msg.params + if not "Interface" in args: + return + data = args['Interface'] + self.update_port(data) + + def receive_port(self, msg): + if not "Interface" in msg.result: + return + data = msg.result['Interface'] + self.update_port(data) + self.state = S_MONITOR + + def start_port_monitor(self): + self.state = S_PORT_GET + params = json.from_string( + '["Open_vSwitch", ' + ' "port_monitor", ' + ' {"Interface": ' + ' [{"columns": ' + ' ["name", "ofport", "type", "external_ids", "options"]}]}]') + self.send_request("monitor", params) + + def receive_set_controller(self, msg): + LOG.debug("set controller: %s", msg) + for row in msg.result: + if "error" in row: + err = str(row["error"]) + if "details" in row: + err += ": " + str(row["details"]) + LOG.error("could not set controller: %s", err) + self.is_active = False + return + self.start_port_monitor() + + def start_set_controller(self, _table_row): + self.state = S_CTRL_SET + if not self.ctrl_addr: + self.start_port_monitor() + return + + # TODO:XXX + # check duplication and don't delete other controller. + uuid_ = str(uuid.uuid4()).replace('-', '_') + params = json.from_string( + '["Open_vSwitch", ' + ' {"op": "insert", ' + ' "table": "Controller", ' + ' "row": {"target": "%s"}, ' + ' "uuid-name": "row%s"}, ' + ' {"op": "update", ' + ' "table": "Bridge", ' + ' "row": {"controller": ["named-uuid", "row%s"]}, ' + ' "where": [["_uuid", "==", ["uuid", "%s"]]]}]' % + (str(self.ctrl_addr), uuid_, uuid_, str(self.dpid_row))) + self.send_request("transact", params) + + def receive_dpid(self, msg): + LOG.debug('recieve_dpid_monitor %s', msg) + if not "Bridge" in msg.result: + return + data = msg.result['Bridge'] + self.update_dpid(data) # update_dpid() calls start_set_controller() + + def start_dpid_monitor(self): + self.state = S_DPID_GET + params = json.from_string( + '["Open_vSwitch", ' + ' "dpid_monitor", ' + ' {"Bridge": {"columns": ["datapath_id", "name"]}}]') + self.send_request("monitor", params) + + def handle_rpc(self, msg): + _handler = None + try: + _handler = self.handlers[self.state][msg.type] + except KeyError: + pass + + if msg.type == Message.T_REQUEST: + if msg.method == "echo": + reply = Message.create_reply(msg.params, msg.id) + self.send(reply) + elif _handler: + _handler(msg) + else: + reply = Message.create_error({"error": "unknown method"}, + msg.id) + self.send(reply) + LOG.warn("unknown request: %s", msg) + elif msg.type == Message.T_REPLY: + if _handler: + _handler(msg) + else: + LOG.warn("unknown reply: %s", msg) + elif msg.type == Message.T_NOTIFY: + if msg.method == "shutdown": + self.shutdown() + elif _handler: + if msg.method == "update": + key, _args = msg.params + if key in _handler: + _handler[key](msg) + else: + LOG.warn("unknown notification: %s", msg) + else: + LOG.warn("unsolicited JSON-RPC reply or error: %s", msg) + + self.db.commit() + return + + def process_msg(self): + _json = self.parser.finish() + self.parser = None + if isinstance(_json, basestring): + LOG.warn("error parsing stream: %s", _json) + return + msg = Message.from_json(_json) + if not isinstance(msg, Message): + LOG.warn("received bad JSON-RPC message: %s", msg) + return + return msg + + def recv_loop(self): + while self.is_active: + buf = "" + ret = self.socket.recv(4096) + if len(ret) == 0: + self.is_active = False + return + buf += ret + while buf: + if self.parser is None: + self.parser = json.Parser() + buf = buf[self.parser.feed(buf):] + if self.parser.is_done(): + msg = self.process_msg() + if msg: + self.handle_rpc(msg) + + def send(self, msg): + if msg.is_valid(): + LOG.warn("not a valid JSON-RPC request: %s", msg) + return + buf = json.to_string(msg.to_json()) + self.socket.sendall(buf) + + def send_request(self, method, params): + msg = Message.create_request(method, params) + self.send(msg) + + def close(self): + if self.socket: + self.socket.close() + + def set_ovsdb_addr(self, address): + _proto, _host, _port = address.split(':') + self.address = address + + def shutdown(self): + LOG.info("shutdown: %s: dpid=%s", self.address, self.dpid) + self.is_active = False + self.close() # to exit recv_loop() + + def serve(self): + if not self.address: + return + self.network_api.update_network(rest_nw_id.NW_ID_EXTERNAL) + self.network_api.update_network(rest_nw_id.NW_ID_VPORT_GRE) + + proto, host, port = self.address.split(':') + if proto not in ['tcp', 'ssl']: + proto = 'tcp' + self.close() + socket_ = gevent.socket.socket() + if proto == 'ssl': + socket_ = gevent.ssl.wrap_socket(self.socket) + try: + socket_.connect((host, int(port))) + except (socket.error, socket.timeout) as e: + LOG.error("TCP connection failure: %s", e) + raise e + except ssl.SSLError as e: + LOG.error("SSL connection failure: %s", e) + raise e + LOG.info("connect: %s", self.address) + if not self.is_active: + socket_.close() + return + self.socket = socket_ + + self.start_dpid_monitor() + self.recv_loop() + self.close() + + def create_serve_thread(self): + self.is_active = True + return gevent.spawn_later(0, self.serve) + + @staticmethod + def create(dpid, nw): + db = SqlSoup(FLAGS.sql_connection, + session=scoped_session( + sessionmaker(autoflush=True, + expire_on_commit=False, + autocommit=False))) + token = None + if FLAGS.quantum_auth_strategy: + token = _get_auth_token() + q_api = _get_quantum_client(token) + return OVSMonitor(dpid, nw, db, q_api, FLAGS.quantum_controller_addr) + + +class QuantumAdapter(app_manager.RyuApp): + _CONTEXTS = { + 'conf_switch': conf_switch.ConfSwitchSet, + 'network': network.Network, + } + _LOCK = 'lock' + + def __init__(self, *_args, **kwargs): + super(QuantumAdapter, self).__init__() + self.cs = kwargs['conf_switch'] + self.nw = kwargs['network'] + + # protects self.monitors + setattr(self, self._LOCK, gevent.coros.Semaphore()) + self.monitors = {} + + # just connect to sql server to detect wrong parameter early. + LOG.debug('sql_connection %s', FLAGS.sql_connection) + db = SqlSoup(FLAGS.sql_connection, + session=scoped_session( + sessionmaker(autoflush=True, + expire_on_commit=False, + autocommit=False))) + db.commit() + + @synchronized.synchronized(_LOCK) + def _conf_switch_set_ovsdb_addr(self, dpid, value): + if dpid in self.monitors: + mon = self.monitors[dpid] + mon.shutdown() + mon = OVSMonitor.create(dpid, self.nw) + mon.set_ovsdb_addr(value) + mon_thr = mon.create_serve_thread() + self.monitors[dpid] = (mon, mon_thr) + + @synchronized.synchronized(_LOCK) + def _conf_switch_del_ovsdb_addr(self, dpid): + mon_mon_thr = self.monitors.pop(dpid, None) + if mon_mon_thr is None: + LOG.error("no monitor found: %s", dpid) + return + mon, mon_thr = mon_mon_thr + mon.shutdown() + mon_thr.join() + + @handler.set_ev_cls(conf_switch.EventConfSwitchSet, + conf_switch.CONF_SWITCH_EV_DISPATCHER) + def conf_switch_set_handler(self, ev): + LOG.debug("conf_switch set: %s", ev) + if ev.key == cs_key.OVSDB_ADDR: + self._conf_switch_set_ovsdb_addr(ev.dpid, ev.value) + else: + LOG.debug("unknown event: %s", ev) + + @handler.set_ev_cls(conf_switch.EventConfSwitchDel, + conf_switch.CONF_SWITCH_EV_DISPATCHER) + def conf_switch_del_handler(self, ev): + LOG.debug("conf_switch del: %s", ev) + if ev.key == cs_key.OVSDB_ADDR: + self._conf_switch_del_ovsdb_addr(ev.dpid) + else: + LOG.debug("unknown event: %s", ev) diff --git a/ryu/flags.py b/ryu/flags.py index e969620..d7b674d 100644 --- a/ryu/flags.py +++ b/ryu/flags.py @@ -23,3 +23,31 @@ FLAGS = gflags.FLAGS # GLOBAL flags gflags.DEFINE_boolean('monkey_patch', False, 'do monkey patch') + +# app/quantum_adapter +gflags.DEFINE_string( + 'sql_connection', + 'mysql://root:[email protected]/ovs_quantum?charset=utf8', + 'database connection') +gflags.DEFINE_string('int_bridge', 'br-int', 'integration bridge name') + +gflags.DEFINE_string('quantum_url', 'http://localhost:9696', + 'URL for connecting to quantum') +gflags.DEFINE_integer('quantum_url_timeout', 30, + 'timeout value for connecting to quantum in seconds') +gflags.DEFINE_string('quantum_admin_username', 'quantum', + 'username for connecting to quantum in admin context') +gflags.DEFINE_string('quantum_admin_password', 'service_password', + 'password for connecting to quantum in admin context') +gflags.DEFINE_string('quantum_admin_tenant_name', 'service', + 'tenant name for connecting to quantum in admin context') +gflags.DEFINE_string('quantum_admin_auth_url', 'http://localhost:5000/v2.0', + 'auth url for connecting to quantum in admin context') +gflags.DEFINE_string( + 'quantum_auth_strategy', + 'keystone', + 'auth strategy for connecting to quantum in admin context') + +gflags.DEFINE_string('quantum_controller_addr', None, + 'openflow mehod:address:port to set controller of' + 'ovs bridge') -- 1.7.10.4 ------------------------------------------------------------------------------ Everyone hates slow websites. So do we. Make your web apps faster with AppDynamics Download AppDynamics Lite for free today: http://p.sf.net/sfu/appdyn_d2d_nov _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
