Author: phunt
Date: Tue Oct 13 23:14:05 2009
New Revision: 824971

URL: http://svn.apache.org/viewvc?rev=824971&view=rev
Log:
ZOOKEEPER-541. zkpython limited to 256 handles

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c
    hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=824971&r1=824970&r2=824971&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 13 23:14:05 2009
@@ -87,6 +87,8 @@
 
   ZOOKEEEPER-510. zkpython lumps all exceptions as IOError, needs specialized
   exceptions for KeeperException types (henry & pat via mahadev)
+
+  ZOOKEEPER-541. zkpython limited to 256 handles (henry robinson via phunt)
  
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to

Modified: hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c?rev=824971&r1=824970&r2=824971&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c (original)
+++ hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c Tue Oct 13 
23:14:05 2009
@@ -19,11 +19,7 @@
 #include <Python.h>
 #include <zookeeper.h>
 #include <assert.h>
-
-#define MAX_ZHANDLES 256
-static zhandle_t* zhandles[MAX_ZHANDLES];
-static int num_zhandles = 0;
-
+               
 //////////////////////////////////////////////
 // EXCEPTIONS
 PyObject *ZooKeeperException = NULL;
@@ -122,7 +118,73 @@
 // the global watchers for each connection - but they're 
 // inaccessible without pulling in zk_adaptor.h, which I'm
 // trying to avoid. 
-static pywatcher_t *watchers[MAX_ZHANDLES];
+static pywatcher_t **watchers;
+
+// We keep an array of zhandles available for use.
+// When a zhandle is correctly closed, the C client
+// frees the memory so we set the zhandles[i] entry to NULL.
+// This entry can then be re-used 
+static zhandle_t** zhandles = NULL;
+static int num_zhandles = 0;
+static int max_zhandles = 0;
+#define REAL_MAX_ZHANDLES 32768
+
+// Allocates an initial zhandle and watcher array
+void init_zhandles(int num) {
+       zhandles = malloc(sizeof(zhandle_t*)*num);
+       watchers = malloc(sizeof(pywatcher_t*)*num);
+       max_zhandles = num;
+       num_zhandles = 0;
+       memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
+}
+
+// Note that the following zhandle functions are not 
+// thread-safe. The C-Python runtime does not seem to
+// pre-empt a thread that is in a C module, so there's
+// no need for synchronisation. 
+
+// Doubles the size of the zhandle / watcher array
+// Returns 0 if the new array would be >= REAL_MAX_ZHANDLES
+// in size. 
+int resize_zhandles() {
+       zhandle_t **tmp = zhandles;
+       pywatcher_t ** wtmp = watchers;
+       if (max_zhandles >= REAL_MAX_ZHANDLES >> 1) {
+               return -1;
+       }
+       max_zhandles *= 2;
+       zhandles = malloc(sizeof(zhandle_t*)*max_zhandles);
+       memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
+       memcpy(zhandles, tmp, sizeof(zhandle_t*)*max_zhandles/2);
+
+       watchers = malloc(sizeof(pywatcher_t*)*max_zhandles);
+       memset(watchers, 0, sizeof(pywatcher_t*)*max_zhandles);
+       memcpy(watchers, wtmp, sizeof(pywatcher_t*)*max_zhandles/2);
+
+       free(wtmp);
+       free(tmp);
+       return 0;
+}
+
+// Find a free zhandle - this is expensive, but we 
+// expect it to be infrequently called.
+// There are optimisations that can be made if this turns out
+// to be problematic. 
+// Returns -1 if no free handle is found.
+unsigned int next_zhandle() {
+       int i = 0;
+       for (i=0;i<max_zhandles;++i) {
+               if (zhandles[i] == NULL) {
+                       num_zhandles++;
+                       return i;
+               }
+       }
+
+       return -1;
+}
+
+/////////////////////////////////////
+// Pywatcher funcs
 
 pywatcher_t *create_pywatcher(int zh, PyObject* cb, int permanent)
 {
@@ -248,11 +310,16 @@
   clientid_t cid;
   cid.client_id = -1;
   const char *passwd;
+       int handle = next_zhandle();
+       if (handle == -1) {
+               resize_zhandles();
+               handle = next_zhandle();
+       }
 
-  if (num_zhandles >= MAX_ZHANDLES) {
-    PyErr_SetString( ZooKeeperException, "Too many ZooKeeper handles created, 
max is 256" );
-    return NULL;
-  }
+       if (handle == -1) {
+               PyErr_SetString(ZooKeeperException,"Couldn't find a free 
zhandle, something is very wrong");
+               return NULL;
+       }
 
   if (!PyArg_ParseTuple(args, "s|Oi(Ls)", &host, &watcherfn, &recv_timeout, 
&cid.client_id, &passwd)) 
     return NULL;
@@ -262,9 +329,9 @@
   }
   pywatcher_t *pyw = NULL;
   if (watcherfn != Py_None) {
-    pyw = create_pywatcher(num_zhandles, watcherfn,1);
+    pyw = create_pywatcher(handle, watcherfn,1);
   }
-  watchers[num_zhandles] = pyw;
+  watchers[handle] = pyw;
   zhandle_t *zh = zookeeper_init( host, watcherfn != Py_None ? 
watcher_dispatch : NULL, 
                                  recv_timeout, cid.client_id == -1 ? 0 : &cid, 
                                  pyw,
@@ -272,12 +339,12 @@
 
   if (zh == NULL)
     {
-      PyErr_SetString( ZooKeeperException, "Unknown error" );
+      PyErr_SetString( ZooKeeperException, "Could not internally obtain 
zookeeper handle" );
       return NULL;
     }
 
-  zhandles[num_zhandles] = zh;
-  return Py_BuildValue( "i", num_zhandles++ );
+  zhandles[handle] = zh;
+  return Py_BuildValue( "i", handle);
 }
 
 ///////////////////////////////////////////////////////
@@ -1053,6 +1120,7 @@
 PyMODINIT_FUNC initzookeeper() {
   PyEval_InitThreads();
   PyObject *module = Py_InitModule("zookeeper", ZooKeeperMethods );
+       init_zhandles(32);
 
   ZooKeeperException = PyErr_NewException("zookeeper.ZooKeeperException",
                                          PyExc_Exception,

Modified: 
hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py?rev=824971&r1=824970&r2=824971&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py 
(original)
+++ hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py Tue 
Oct 13 23:14:05 2009
@@ -19,6 +19,7 @@
 import unittest, threading
 
 import zookeeper, zktestbase
+ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
 
 class ConnectionTest(zktestbase.TestBase):
     """Test whether we can make a connection"""
@@ -55,6 +56,69 @@
                           self.handle,
                           "/")
 
+    def testhandlereuse(self):
+        """
+        Test a) multiple concurrent connections b) reuse of closed handles
+        """
+        cv = threading.Condition()
+        self.connected = False
+        def connection_watcher(handle, type, state, path):
+            cv.acquire()
+            self.connected = True
+            self.assertEqual(zookeeper.CONNECTED_STATE, state)
+            self.handle = handle
+            cv.notify()
+            cv.release()
+
+        cv.acquire()
+        handles = [ zookeeper.init(self.host) for i in xrange(10) ]
+        ret = zookeeper.init(self.host, connection_watcher)
+        cv.wait(15.0)
+        cv.release()
+        self.assertEqual(self.connected, True, "Connection timed out to " + 
self.host)
+        self.assertEqual(True, all( [ zookeeper.state(handle) == 
zookeeper.CONNECTED_STATE for handle in handles ] ),
+                         "Not all connections succeeded")
+        oldhandle = handles[3]
+        zookeeper.close(oldhandle)
+        newhandle = zookeeper.init(self.host)
+
+        # This assertion tests *internal* behaviour; i.e. that the module
+        # correctly reuses closed handles. This is therefore implementation
+        # dependent.
+        self.assertEqual(newhandle, oldhandle, "Didn't get reused handle")
+
+    def testmanyhandles(self):
+        """
+        Test the ability of the module to support many handles.
+        """
+        # We'd like to do more, but currently the C client doesn't
+        # work with > 83 handles (fails to create a pipe) on MacOS 10.5.8
+        handles = [ zookeeper.init(self.host) for i in xrange(63) ]
+
+        cv = threading.Condition()
+        self.connected = False
+        def connection_watcher(handle, type, state, path):
+            cv.acquire()
+            self.connected = True
+            self.assertEqual(zookeeper.CONNECTED_STATE, state)
+            self.handle = handle
+            cv.notify()
+            cv.release()
+
+        cv.acquire()
+        ret = zookeeper.init(self.host, connection_watcher)
+        cv.wait(15.0)
+        cv.release()
+        self.assertEqual(self.connected, True, "Connection timed out to " + 
self.host)
+
+        for i,h in enumerate(handles):
+            path = "/zkpython-test-handles-%s" % str(i)
+            self.assertEqual(path, zookeeper.create(h, path, "", 
[ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL))
+
+        self.assertEqual(True, all( zookeeper.close(h) == zookeeper.OK for h 
in handles ))
+
+
+
     def tearDown(self):
         pass
 


Reply via email to