Prevent `client.discover_schemas` from blocking while calling into the
`ovs` library by emulating `jsonrpc.Connection.transact_block` to not
block and explicitly allow greenlet switching every loop.

Works with both the embeded ryu.contrib.ovs and upstream ovs python
packages.

Signed-off-by: Jason Kölker <[email protected]>
---
 ryu/services/protocols/ovsdb/client.py | 48 +++++++++++++++++++++++++++++++---
 1 file changed, 44 insertions(+), 4 deletions(-)

diff --git a/ryu/services/protocols/ovsdb/client.py 
b/ryu/services/protocols/ovsdb/client.py
index 5d4b21f..4a67dfd 100644
--- a/ryu/services/protocols/ovsdb/client.py
+++ b/ryu/services/protocols/ovsdb/client.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import collections
+import errno
 import logging
 import uuid
 
@@ -33,6 +34,7 @@ vlog.Vlog = Vlog
 
 
 from ovs import jsonrpc
+from ovs import poller
 from ovs import reconnect
 from ovs import stream
 from ovs import timeval
@@ -63,8 +65,46 @@ def dictify(row):
     if row is None:
         return
 
-    return dict([(k, v.to_python(_uuid_to_row))
-                 for k, v in row._data.items()])
+    result = {}
+
+    for key, value in row._data.items():
+        result[key] = value.to_python(_uuid_to_row)
+        hub.sleep(0)
+
+    return result
+
+
+def transact_block(request, connection):
+    """Emulate jsonrpc.Connection.transact_block without blocking eventlet.
+    """
+    error = connection.send(request)
+    reply = None
+
+    if error:
+        return error, reply
+
+    ovs_poller = poller.Poller()
+    while not error:
+        ovs_poller.immediate_wake()
+        error, reply = connection.recv()
+
+        if error != errno.EAGAIN:
+            break
+
+        if (reply and
+            reply.id == request.id and
+            reply.type in (jsonrpc.Message.T_REPLY,
+                           jsonrpc.Message.T_ERROR)):
+            break
+
+        connection.run()
+        connection.wait(poller)
+        connection.recv_wait(poller)
+        poller.block()
+
+        hub.sleep(0)
+
+    return error, reply
 
 
 def discover_schemas(connection):
@@ -72,7 +112,7 @@ def discover_schemas(connection):
     #                is supported.
     # TODO(jkoelker) support arbitrary schemas
     req = jsonrpc.Message.create_request('list_dbs', [])
-    error, reply = connection.transact_block(req)
+    error, reply = transact_block(req, connection)
 
     if error or reply.error:
         return
@@ -83,7 +123,7 @@ def discover_schemas(connection):
             continue
 
         req = jsonrpc.Message.create_request('get_schema', [db])
-        error, reply = connection.transact_block(req)
+        error, reply = transact_block(req, connection)
 
         if error or reply.error:
             # TODO(jkoelker) Error handling
-- 
2.7.3


------------------------------------------------------------------------------
Transform Data into Opportunity.
Accelerate data analysis in your applications with
Intel Data Analytics Acceleration Library.
Click to learn more.
http://pubads.g.doubleclick.net/gampad/clk?id=278785231&iu=/4140
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to