Allows listening on a socket for OVSDB clients, reacting to their events
and modifying their database.

Co-Authored-By: Chris Hansen <[email protected]>
Co-Authored-By: Ravi Kamachi <[email protected]>

Signed-off-by: Jason Kölker <[email protected]>
Signed-off-by: Chris Hansen <[email protected]>
Signed-off-by: Ravi Kamachi <[email protected]>
---
 doc/source/library.rst                   |   1 +
 doc/source/library_ovsdb_manager.rst     |  61 ++++++
 ryu/services/protocols/ovsdb/__init__.py |  14 ++
 ryu/services/protocols/ovsdb/api.py      | 137 +++++++++++++
 ryu/services/protocols/ovsdb/client.py   | 336 +++++++++++++++++++++++++++++++
 ryu/services/protocols/ovsdb/event.py    | 180 +++++++++++++++++
 ryu/services/protocols/ovsdb/manager.py  | 149 ++++++++++++++
 ryu/services/protocols/ovsdb/model.py    |  44 ++++
 8 files changed, 922 insertions(+)
 create mode 100644 doc/source/library_ovsdb_manager.rst
 create mode 100644 ryu/services/protocols/ovsdb/__init__.py
 create mode 100644 ryu/services/protocols/ovsdb/api.py
 create mode 100644 ryu/services/protocols/ovsdb/client.py
 create mode 100644 ryu/services/protocols/ovsdb/event.py
 create mode 100644 ryu/services/protocols/ovsdb/manager.py
 create mode 100644 ryu/services/protocols/ovsdb/model.py

diff --git a/doc/source/library.rst b/doc/source/library.rst
index 38cc387..bc8ff67 100644
--- a/doc/source/library.rst
+++ b/doc/source/library.rst
@@ -12,3 +12,4 @@ Ryu provides some useful library for your network 
applications.
    library_of_config.rst
    library_bgp_speaker.rst
    library_bgp_speaker_ref.rst
+   library_ovsdb_manager.rst
diff --git a/doc/source/library_ovsdb_manager.rst 
b/doc/source/library_ovsdb_manager.rst
new file mode 100644
index 0000000..b23ae81
--- /dev/null
+++ b/doc/source/library_ovsdb_manager.rst
@@ -0,0 +1,61 @@
+*********************
+OVSDB Manager library
+*********************
+
+Introduction
+============
+
+Ryu OVSDB Manager library allows your code to interact with devices
+speaking the OVSDB protocol. This enables your code to perform remote
+management of the devices and react to topology changes on them.
+
+Example
+=======
+
+The following logs all new OVSDB connections and allows creating a port
+on a bridge.
+
+.. code-block:: python
+
+    import uuid
+
+    from ryu.base import app_manager
+    from ryu.services.protocols.ovsdb import api as ovsdb
+    from ryu.services.protocols.ovsdb import event as ovsdb_event
+
+
+    class MyApp(app_manager.RyuApp):
+        @set_ev_cls(ovsdb_event.EventNewOVSDBConnection)
+        def handle_new_ovsdb_connection(self, ev):
+            system_id = ev.system_id
+            self.logger.info('New OVSDB connection from system id %s',
+                             systemd_id)
+
+        def create_port(self, systemd_id, bridge_name, name):
+            new_iface_uuid = uuid.uuid4()
+            new_port_uuid = uuid.uuid4()
+
+            def _create_port(tables, insert):
+                bridge = ovsdb.row_by_name(self, system_id, bridge_name)
+
+                iface = insert(tables['Interface'], new_iface_uuid)
+                iface.name = name
+                iface.type = 'internal'
+
+                port = insert(tables['Port'], new_port_uuid)
+                port.name = name
+                port.interfaces = [iface]
+
+                brdige.ports = bridfe.ports + [port]
+
+                return (new_port_uuid, new_iface_uuid)
+
+            req = ovsdb_event.EventModifyRequest(system_id, _create_port)
+            rep = self.send_request(req)
+
+            if rep.status != 'success':
+                self.logger.error('Error creating port %s on bridge %s: %s',
+                                  name, bridge, rep.status)
+                return None
+
+            return reply.insert_uuid[new_port_uuid]
diff --git a/ryu/services/protocols/ovsdb/__init__.py 
b/ryu/services/protocols/ovsdb/__init__.py
new file mode 100644
index 0000000..fb3d454
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/__init__.py
@@ -0,0 +1,14 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/ryu/services/protocols/ovsdb/api.py 
b/ryu/services/protocols/ovsdb/api.py
new file mode 100644
index 0000000..ea73cbf
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/api.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ryu.lib import dpid as dpidlib
+from ryu.services.protocols.ovsdb import event as ovsdb_event
+
+
+def match_row(manager, system_id, table, fn):
+    def _match_row(tables):
+        return next((r for r in tables[table].rows.values()
+                     if fn(r)), None)
+
+    request_to_get_tables = ovsdb_event.EventReadRequest(system_id,
+                                                         _match_row)
+    reply_to_get_tables = manager.send_request(request_to_get_tables)
+    return reply_to_get_tables.result
+
+
+def match_rows(manager, system_id, table, fn):
+    def _match_rows(tables):
+        return (r for r in tables[table].rows.values() if fn(r))
+
+    request = ovsdb_event.EventReadRequest(system_id, _match_rows)
+    reply = manager.send_request(request)
+    return reply.result
+
+
+def row_by_name(manager, system_id, name, table='Bridge', fn=None):
+    matched_row = match_row(manager, system_id, table,
+                            lambda row: row.name == name)
+
+    if fn is not None:
+        return fn(matched_row)
+
+    return matched_row
+
+
+def get_column_value(manager, table, record, column):
+    """
+    Example : To get datapath_id from Bridge table
+    get_column_value('Bridge', <bridge name>, 'datapath_id').strip('"')
+    """
+    row = row_by_name(manager, record, table)
+    value = getattr(row, column, "")
+
+    if isinstance(value, list) and len(value) == 1:
+        value = value[0]
+
+    return str(value)
+
+
+def get_iface_by_name(manager, system_id, name, fn=None):
+    iface = row_by_name(manager, system_id, name, 'Interface')
+
+    if fn is not None:
+        return fn(iface)
+
+    return iface
+
+
+def get_bridge_for_iface_name(manager, system_id, iface_name, fn=None):
+    iface = row_by_name(manager, system_id, iface_name, 'Interface')
+    port = match_row(manager, system_id, 'Port',
+                     lambda x: iface in x.interfaces)
+    bridge = match_row(manager, system_id, 'Bridge',
+                       lambda x: port in x.ports)
+
+    if fn is not None:
+        return fn(bridge)
+
+    return bridge
+
+
+def get_table(manager, system_id, name):
+    def _get_table(tables):
+        return tables[name]
+
+    request_to_get_tables = ovsdb_event.EventReadRequest(system_id,
+                                                         _get_table)
+    reply_to_get_tables = manager.send_request(request_to_get_tables)
+    return reply_to_get_tables.result
+
+
+def get_bridge_by_datapath_id(manager, system_id, datapath_id, fn=None):
+    def _match_fn(row):
+        row_dpid = dpidlib.str_to_dpid(str(row.datapath_id[0]))
+        return row_dpid == datapath_id
+
+    bridge = match_row(manager, system_id, 'Bridge', _match_fn)
+
+    if fn is not None:
+        return fn(bridge)
+
+    return bridge
+
+
+def get_datapath_ids_for_systemd_id(manager, system_id):
+    def _get_dp_ids(tables):
+        dp_ids = []
+
+        bridges = tables.get('Bridge')
+
+        if not bridges:
+            return dp_ids
+
+        for bridge in bridges.rows.values():
+            datapath_ids = bridge.datapath_id
+            dp_ids.extend(dpidlib.str_to_dpid(dp_id) for dp_id in datapath_ids)
+
+        return dp_ids
+
+    request = ovsdb_event.EventReadRequest(system_id, _get_dp_ids)
+    reply = manager.send_request(request)
+    return reply.result
+
+
+def get_bridges_by_system_id(manager, system_id):
+    return get_table(manager, system_id, 'Bridge').rows.values()
+
+
+def bridge_exists(manager, system_id, bridge_name):
+    return bool(row_by_name(manager, system_id, bridge_name))
+
+
+def port_exists(manager, system_id, port_name):
+    return bool(row_by_name(manager, system_id, port_name, 'Port'))
diff --git a/ryu/services/protocols/ovsdb/client.py 
b/ryu/services/protocols/ovsdb/client.py
new file mode 100644
index 0000000..175936c
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/client.py
@@ -0,0 +1,336 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import logging
+import uuid
+
+# NOTE(jkoelker) Patch Vlog so that is uses standard logging
+from ovs import vlog
+
+
+class Vlog(vlog.Vlog):
+    def __init__(self, name):
+        self.log = logging.getLogger('ovs.%s' % name)
+
+    def __log(self, level, message, **kwargs):
+        level = vlog.LEVELS.get(level, logging.DEBUG)
+        self.log.log(level, message, **kwargs)
+
+vlog.Vlog = Vlog
+
+
+from ovs import jsonrpc
+from ovs import reconnect
+from ovs import stream
+from ovs import timeval
+from ovs.db import idl
+
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.services.protocols.ovsdb import event
+from ryu.services.protocols.ovsdb import model
+
+
+now = timeval.msec
+
+
+def _uuid_to_row(atom, base):
+    if base.ref_table:
+        value = base.ref_table.rows.get(atom)
+    else:
+        value = atom
+
+    if isinstance(value, idl.Row):
+        value = str(value.uuid)
+
+    return value
+
+
+def dictify(row):
+    if row is None:
+        return
+
+    return dict([(k, v.to_python(_uuid_to_row))
+                 for k, v in row._data.items()])
+
+
+def discover_schemas(connection):
+    # NOTE(jkoelker) currently only the Open_vSwitch schema
+    #                is supported.
+    # TODO(jkoelker) support arbitrary schemas
+    req = jsonrpc.Message.create_request('list_dbs', [])
+    error, reply = connection.transact_block(req)
+
+    if error or reply.error:
+        return
+
+    schemas = []
+    for db in reply.result:
+        if db != 'Open_vSwitch':
+            continue
+
+        req = jsonrpc.Message.create_request('get_schema', [db])
+        error, reply = connection.transact_block(req)
+
+        if error or reply.error:
+            # TODO(jkoelker) Error handling
+            continue
+
+        schemas.append(reply.result)
+
+    return schemas
+
+
+def discover_system_id(idl):
+    system_id = None
+
+    while system_id is None:
+        idl.run()
+        openvswitch = idl.tables['Open_vSwitch'].rows
+
+        if openvswitch:
+            row = openvswitch.get(list(openvswitch.keys())[0])
+            system_id = row.external_ids.get('system-id')
+
+    return system_id
+
+
+# NOTE(jkoelker) Wrap ovs's Idl to accept an existing session, and
+#                trigger callbacks on changes
+class Idl(idl.Idl):
+    def __init__(self, session, schema):
+        if not isinstance(schema, idl.SchemaHelper):
+            schema = idl.SchemaHelper(schema_json=schema)
+            schema.register_all()
+
+        schema = schema.get_idl_schema()
+
+        # NOTE(jkoelker) event buffer
+        self._events = []
+
+        self.tables = schema.tables
+        self._db = schema
+        self._session = session
+        self._monitor_request_id = None
+        self._last_seqno = None
+        self.change_seqno = 0
+
+        # Database locking.
+        self.lock_name = None          # Name of lock we need, None if none.
+        self.has_lock = False          # Has db server said we have the lock?
+        self.is_lock_contended = False  # Has db server said we can't get lock?
+        self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
+
+        # Transaction support.
+        self.txn = None
+        self._outstanding_txns = {}
+
+        for table in schema.tables.values():
+            for column in table.columns.values():
+                if not hasattr(column, 'alert'):
+                    column.alert = True
+            table.need_table = False
+            table.rows = {}
+            table.idl = self
+
+    @property
+    def events(self):
+        events = self._events
+        self._events = []
+        return events
+
+    def __process_update(self, table, uuid, old, new):
+        old_row = table.rows.get(uuid)
+        if old_row is not None:
+            old_row = model.Row(dictify(old_row))
+            old_row['_uuid'] = uuid
+
+        changed = idl.Idl.__process_update(self, table, uuid, old, new)
+
+        if changed:
+            if not new:
+                ev = (event.EventRowDelete, (table.name, old_row))
+
+            elif not old:
+                new_row = model.Row(dictify(table.rows.get(uuid)))
+                new_row['_uuid'] = uuid
+                ev = (event.EventRowInsert, (table.name, new_row))
+
+            else:
+                new_row = model.Row(dictify(table.rows.get(uuid)))
+                new_row['_uuid'] = uuid
+
+                ev = (event.EventRowUpdate, (table.name, old_row, new_row))
+
+            self._events.append(ev)
+
+        return changed
+
+
+class RemoteOvsdb(app_manager.RyuApp):
+    _EVENTS = [event.EventRowUpdate,
+               event.EventRowDelete,
+               event.EventRowInsert,
+               event.EventInterfaceDeleted,
+               event.EventInterfaceInserted,
+               event.EventInterfaceUpdated,
+               event.EventPortDeleted,
+               event.EventPortInserted,
+               event.EventPortUpdated]
+
+    @classmethod
+    def factory(cls, sock, address, *args, **kwargs):
+        ovs_stream = stream.Stream(sock, None, None)
+        connection = jsonrpc.Connection(ovs_stream)
+        schemas = discover_schemas(connection)
+
+        if not schemas:
+            return
+
+        fsm = reconnect.Reconnect(now())
+        fsm.set_name('%s:%s' % address)
+        fsm.enable(now())
+        fsm.set_passive(True, now())
+        fsm.set_max_tries(-1)
+        fsm.connected(now())
+
+        session = jsonrpc.Session(fsm, connection)
+        idl = Idl(session, schemas[0])
+
+        system_id = discover_system_id(idl)
+        name = cls.instance_name(system_id)
+        ovs_stream.name = name
+        connection.name = name
+        fsm.set_name(name)
+
+        kwargs = kwargs.copy()
+        kwargs['address'] = address
+        kwargs['idl'] = idl
+        kwargs['name'] = name
+        kwargs['system_id'] = system_id
+
+        app_mgr = app_manager.AppManager.get_instance()
+        return app_mgr.instantiate(cls, *args, **kwargs)
+
+    @classmethod
+    def instance_name(cls, system_id):
+        return '%s-%s' % (cls.__name__, system_id)
+
+    def __init__(self, *args, **kwargs):
+        super(RemoteOvsdb, self).__init__(*args, **kwargs)
+        self.address = kwargs['address']
+        self._idl = kwargs['idl']
+        self.system_id = kwargs['system_id']
+        self.name = kwargs['name']
+        self._txn_q = collections.deque()
+
+    def _event_proxy_loop(self):
+        while self.is_active:
+            events = self._idl.events
+
+            if not events:
+                hub.sleep(0.1)
+                continue
+
+            for event in events:
+                ev = event[0]
+                args = event[1]
+                self._submit_event(ev(self.system_id, *args))
+
+            hub.sleep(0)
+
+    def _submit_event(self, ev):
+        self.send_event_to_observers(ev)
+        try:
+            ev_cls_name = 'Event' + ev.table + ev.event_type
+            proxy_ev_cls = getattr(event, ev_cls_name, None)
+            if proxy_ev_cls:
+                self.send_event_to_observers(proxy_ev_cls(ev))
+        except Exception:
+            self.logger.exception('Error submitting specific event for OVSDB',
+                                  self.system_id)
+
+    def _idl_loop(self):
+        while self.is_active:
+            try:
+                self._idl.run()
+                self._transactions()
+            except Exception:
+                self.logger.exception('Error running IDL for system_id %s' %
+                                      self.system_id)
+                break
+
+            hub.sleep(0)
+
+    def _run_thread(self, func, *args, **kwargs):
+        try:
+            func(*args, **kwargs)
+
+        finally:
+            self.stop()
+
+    def _transactions(self):
+        if not self._txn_q:
+            return
+
+        # NOTE(jkoelker) possibly run multiple transactions per loop?
+        self._transaction()
+
+    def _transaction(self):
+        req = self._txn_q.popleft()
+        txn = idl.Transaction(self._idl)
+
+        uuids = req.func(self._idl.tables, txn.insert)
+        status = txn.commit_block()
+
+        insert_uuids = {}
+        err_msg = None
+
+        if status in (idl.Transaction.SUCCESS,
+                      idl.Transaction.UNCHANGED):
+            if uuids:
+                if isinstance(uuids, uuid.UUID):
+                    insert_uuids[uuids] = txn.get_insert_uuid(uuids)
+
+                else:
+                    insert_uuids = dict((uuid, txn.get_insert_uuid(uuid))
+                                        for uuid in uuids)
+        else:
+            err_msg = txn.get_error()
+
+        rep = event.EventModifyReply(self.system_id, status, insert_uuids,
+                                     err_msg)
+        self.reply_to_request(req, rep)
+
+    def modify_request_handler(self, ev):
+        self._txn_q.append(ev)
+
+    def read_request_handler(self, ev):
+        result = ev.func(self._idl.tables)
+        rep = event.EventReadReply(self.system_id, result)
+        self.reply_to_request(ev, rep)
+
+    def start(self):
+        super(RemoteOvsdb, self).start()
+        t = hub.spawn(self._run_thread, self._idl_loop)
+        self.threads.append(t)
+
+        t = hub.spawn(self._run_thread, self._event_proxy_loop)
+        self.threads.append(t)
+
+    def stop(self):
+        super(RemoteOvsdb, self).stop()
+        self._idl.close()
diff --git a/ryu/services/protocols/ovsdb/event.py 
b/ryu/services/protocols/ovsdb/event.py
new file mode 100644
index 0000000..2353a4f
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/event.py
@@ -0,0 +1,180 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ryu.controller import event as ryu_event
+from ryu.controller import handler
+
+
+class EventRowBase(ryu_event.EventBase):
+    def __init__(self, system_id, table, row, event_type):
+        super(EventRowBase, self).__init__()
+        self.system_id = system_id
+        self.table = table
+        self.row = row
+        self.event_type = event_type
+
+    def __str__(self):
+        return '%s<system_id=%s table=%s, uuid=%s>' % (self.__class__.__name__,
+                                                       self.system_id,
+                                                       self.table,
+                                                       self.row['_uuid'])
+
+
+class EventRowDelete(EventRowBase):
+    def __init__(self, system_id, table, row):
+        super(EventRowDelete, self).__init__(system_id, table, row, 'Deleted')
+
+
+class EventRowInsert(EventRowBase):
+    def __init__(self, system_id, table, row):
+        super(EventRowInsert, self).__init__(system_id, table, row, 'Inserted')
+
+
+class EventRowUpdate(ryu_event.EventBase):
+    def __init__(self, system_id, table, old, new):
+        super(EventRowUpdate, self).__init__()
+        self.system_id = system_id
+        self.table = table
+        self.old = old
+        self.new = new
+        self.event_type = 'Updated'
+
+    def __str__(self):
+        return '%s<system_id=%s table=%s, uuid=%s>' % (self.__class__.__name__,
+                                                       self.system_id,
+                                                       self.table,
+                                                       self.old['_uuid'])
+
+
+class EventModifyRequest(ryu_event.EventRequestBase):
+    """ Dispatch a modify function to OVSDB
+
+    `func` must be a callable that accepts an insert fucntion and the
+    IDL.tables object. It can then modify the tables as needed. For inserts,
+    specify a UUID for each insert, and return a tuple of the temporary
+    UUID's. The execution of `func` will be wrapped in a single transaction
+    and the reply will include a dict of temporary UUID to real UUID mappings.
+
+    e.g.
+
+        new_port_uuid = uuid.uuid4()
+
+        def modify(tables, insert):
+            bridges = tables['Bridge'].rows
+            bridge = None
+            for b in bridges:
+                if b.name == 'my-bridge':
+                    bridge = b
+
+            if not bridge:
+                return
+
+            port = insert('Port', new_port_uuid)
+
+            bridge.ports = bridge.ports + [port]
+
+            return (new_port_uuid, )
+
+        request = EventModifyRequest(system_id, modify)
+        reply = send_request(request)
+
+        port_uuid = reply.insert_uuids[new_port_uuid]
+    """
+    def __init__(self, system_id, func):
+        super(EventModifyRequest, self).__init__()
+        self.dst = 'OVSDB'
+        self.system_id = system_id
+        self.func = func
+
+
+class EventModifyReply(ryu_event.EventReplyBase):
+    def __init__(self, system_id, status, insert_uuids, err_msg):
+        self.system_id = system_id
+        self.status = status
+        self.insert_uuids = insert_uuids
+        self.err_msg = err_msg
+
+
+class EventNewOVSDBConnection(ryu_event.EventBase):
+    def __init__(self, system_id):
+        super(EventNewOVSDBConnection, self).__init__()
+        self.system_id = system_id
+
+    def __str__(self):
+        return '%s<system_id=%s>' % (self.__class__.__name__,
+                                     self.system_id)
+
+
+class EventReadRequest(ryu_event.EventRequestBase):
+    def __init__(self, system_id, func):
+        self.system_id = system_id
+        self.func = func
+        self.dst = 'OVSDB'
+
+
+class EventReadReply(ryu_event.EventReplyBase):
+    def __init__(self, system_id, result, err_msg=''):
+        self.system_id = system_id
+        self.result = result
+        self.err_msg = err_msg
+
+
+class EventRowInsertedBase(EventRowInsert):
+    def __init__(self, ev):
+        super(EventRowInsertedBase, self).__init__(ev.system_id,
+                                                   ev.table,
+                                                   ev.row)
+
+
+class EventRowDeletedBase(EventRowDelete):
+    def __init__(self, ev):
+        super(EventRowDeletedBase, self).__init__(ev.system_id,
+                                                  ev.table,
+                                                  ev.row)
+
+
+class EventRowUpdatedBase(EventRowUpdate):
+    def __init__(self, ev):
+        super(EventRowUpdatedBase, self).__init__(ev.system_id,
+                                                  ev.table,
+                                                  ev.old,
+                                                  ev.new)
+
+
+class EventPortInserted(EventRowInsertedBase):
+    pass
+
+
+class EventPortDeleted(EventRowDeletedBase):
+    pass
+
+
+class EventPortUpdated(EventRowUpdatedBase):
+    pass
+
+
+class EventInterfaceInserted(EventRowInsertedBase):
+    pass
+
+
+class EventInterfaceDeleted(EventRowDeletedBase):
+    pass
+
+
+class EventInterfaceUpdated(EventRowUpdatedBase):
+    pass
+
+
+handler.register_service('ryu.services.protocols.ovsdb.manager')
diff --git a/ryu/services/protocols/ovsdb/manager.py 
b/ryu/services/protocols/ovsdb/manager.py
new file mode 100644
index 0000000..b34fb7d
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/manager.py
@@ -0,0 +1,149 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ssl
+import socket
+
+from ryu import cfg
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.services.protocols.ovsdb import client
+from ryu.services.protocols.ovsdb import event
+from ryu.controller import handler
+
+
+opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
+        cfg.IntOpt('port', default=6640, help='OVSDB port'),
+        cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
+        cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
+        cfg.ListOpt('whitelist', default=[],
+                    help='Whitelist of address to allow to connect'))
+
+cfg.CONF.register_opts(opts, 'ovsdb')
+
+
+class OVSDB(app_manager.RyuApp):
+    _EVENTS = [event.EventNewOVSDBConnection,
+               event.EventModifyRequest,
+               event.EventReadRequest]
+
+    def __init__(self, *args, **kwargs):
+        super(OVSDB, self).__init__(*args, **kwargs)
+        self._address = self.CONF.ovsdb.address
+        self._port = self.CONF.ovsdb.port
+        self._clients = {}
+
+    def _accept(self, server):
+        if self.CONF.ovsdb.whitelist:
+            def check(address):
+                if address in self.CONF.ovsdb.whitelist:
+                    return True
+
+                self.logger.debug('Connection from non-whitelist client '
+                                  '(%s:%s)' % address)
+                return False
+
+        else:
+            def check(address):
+                return True
+
+        while True:
+            # TODO(jkoelker) SSL Certificate Fingerprint check
+            sock, client_address = server.accept()
+
+            if not check(client_address[0]):
+                sock.shutdown(socket.SHUT_RDWR)
+                sock.close()
+                continue
+
+            self.logger.debug('New connection from %s:%s' % client_address)
+            t = hub.spawn(self._start_remote, sock, client_address)
+            self.threads.append(t)
+
+    def _proxy_event(self, ev):
+        system_id = ev.system_id
+        client_name = client.RemoteOvsdb.instance_name(system_id)
+
+        if client_name not in self._clients:
+            self.logger.info('Unknown remote system_id %s' % system_id)
+            return
+
+        return self.send_event(client_name, ev)
+
+    def _start_remote(self, sock, client_address):
+        app = client.RemoteOvsdb.factory(sock, client_address)
+
+        if app:
+            self._clients[app.name] = app
+            app.start()
+            ev = event.EventNewOVSDBConnection(app.system_id)
+            self.send_event_to_observers(ev)
+
+    def start(self):
+        server = hub.listen((self._address, self._port))
+        key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
+        cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert
+
+        if key is not None and cert is not None:
+            ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True)
+
+            if self.CONF.ca_certs is not None:
+                ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+                ssl_kwargs['ca_certs'] = self.CONF.ca_certs
+
+            server = ssl.wrap_socket(server, **ssl_kwargs)
+
+        self._server = server
+
+        self.logger.info('Listening on %s:%s for clients' % (self._address,
+                                                             self._port))
+        t = hub.spawn(self._accept, self._server)
+        super(OVSDB, self).start()
+        return t
+
+    def stop(self):
+        for client in self._clients.values():
+            client.stop()
+
+        super(OVSDB, self).stop()
+
+    @handler.set_ev_cls(event.EventModifyRequest)
+    def modify_request_handler(self, ev):
+
+        system_id = ev.system_id
+        client_name = client.RemoteOvsdb.instance_name(system_id)
+        remote = self._clients.get(client_name)
+
+        if not remote:
+            msg = 'Unknown remote system_id %s' % system_id
+            self.logger.info(msg)
+            rep = event.EventModifyReply(system_id, None, None, msg)
+            return self.reply_to_request(ev, rep)
+
+        return remote.modify_request_handler(ev)
+
+    @handler.set_ev_cls(event.EventReadRequest)
+    def read_request_handler(self, ev):
+        system_id = ev.system_id
+        client_name = client.RemoteOvsdb.instance_name(system_id)
+        remote = self._clients.get(client_name)
+
+        if not remote:
+            msg = 'Unknown remote system_id %s' % system_id
+            self.logger.info(msg)
+            rep = event.EventReadReply(self.system_id, None, msg)
+            return self.reply_to_request(ev, rep)
+
+        return remote.read_request_handler(ev)
diff --git a/ryu/services/protocols/ovsdb/model.py 
b/ryu/services/protocols/ovsdb/model.py
new file mode 100644
index 0000000..992c785
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/model.py
@@ -0,0 +1,44 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+
+class _UUIDDict(dict):
+    def _uuidize(self):
+        if '_uuid' not in self or self['_uuid'] is None:
+            self['_uuid'] = uuid.uuid4()
+
+    @property
+    def uuid(self):
+        self._uuidize()
+        return self['_uuid']
+
+    @uuid.setter
+    def uuid(self, value):
+        self['_uuid'] = value
+
+
+class Row(_UUIDDict):
+    @property
+    def delete(self):
+        if '_delete' in self and self['_delete']:
+            return True
+
+        return False
+
+    @delete.setter
+    def delete(self, value):
+        self['_delete'] = value
-- 
2.4.3


------------------------------------------------------------------------------
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to