Author: phunt Date: Thu Nov 19 22:12:53 2009 New Revision: 882327 URL: http://svn.apache.org/viewvc?rev=882327&view=rev Log: ZOOKEEPER-541. zkpython limited to 256 handles
Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/c/zookeeper.c hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/test/connection_test.py Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=882327&r1=882326&r2=882327&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Thu Nov 19 22:12:53 2009 @@ -13,6 +13,11 @@ ZOOKEEPER-510. zkpython lumps all exceptions as IOError, needs specialized exceptions for KeeperException types (henry & pat via mahadev) + ZOOKEEPER-540. zkpython needs better tracking of handle validity + (henry via phunt) + + ZOOKEEPER-541. zkpython limited to 256 handles (henry robinson via phunt) + ZOOKEEPER-562. c client can flood server with pings if tcp send queue filled. (ben reed via mahadev) Modified: hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/c/zookeeper.c URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/c/zookeeper.c?rev=882327&r1=882326&r2=882327&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/c/zookeeper.c (original) +++ hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/c/zookeeper.c Thu Nov 19 22:12:53 2009 @@ -19,11 +19,7 @@ #include <Python.h> #include <zookeeper.h> #include <assert.h> - -#define MAX_ZHANDLES 256 -static zhandle_t* zhandles[MAX_ZHANDLES]; -static int num_zhandles = 0; - + ////////////////////////////////////////////// // EXCEPTIONS PyObject *ZooKeeperException = NULL; @@ -122,7 +118,73 @@ // the global watchers for each connection - but they're // inaccessible without pulling in zk_adaptor.h, which I'm // trying to avoid. -static pywatcher_t *watchers[MAX_ZHANDLES]; +static pywatcher_t **watchers; + +// We keep an array of zhandles available for use. +// When a zhandle is correctly closed, the C client +// frees the memory so we set the zhandles[i] entry to NULL. +// This entry can then be re-used +static zhandle_t** zhandles = NULL; +static int num_zhandles = 0; +static int max_zhandles = 0; +#define REAL_MAX_ZHANDLES 32768 + +// Allocates an initial zhandle and watcher array +void init_zhandles(int num) { + zhandles = malloc(sizeof(zhandle_t*)*num); + watchers = malloc(sizeof(pywatcher_t*)*num); + max_zhandles = num; + num_zhandles = 0; + memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles); +} + +// Note that the following zhandle functions are not +// thread-safe. The C-Python runtime does not seem to +// pre-empt a thread that is in a C module, so there's +// no need for synchronisation. + +// Doubles the size of the zhandle / watcher array +// Returns 0 if the new array would be >= REAL_MAX_ZHANDLES +// in size. +int resize_zhandles() { + zhandle_t **tmp = zhandles; + pywatcher_t ** wtmp = watchers; + if (max_zhandles >= REAL_MAX_ZHANDLES >> 1) { + return -1; + } + max_zhandles *= 2; + zhandles = malloc(sizeof(zhandle_t*)*max_zhandles); + memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles); + memcpy(zhandles, tmp, sizeof(zhandle_t*)*max_zhandles/2); + + watchers = malloc(sizeof(pywatcher_t*)*max_zhandles); + memset(watchers, 0, sizeof(pywatcher_t*)*max_zhandles); + memcpy(watchers, wtmp, sizeof(pywatcher_t*)*max_zhandles/2); + + free(wtmp); + free(tmp); + return 0; +} + +// Find a free zhandle - this is expensive, but we +// expect it to be infrequently called. +// There are optimisations that can be made if this turns out +// to be problematic. +// Returns -1 if no free handle is found. +unsigned int next_zhandle() { + int i = 0; + for (i=0;i<max_zhandles;++i) { + if (zhandles[i] == NULL) { + num_zhandles++; + return i; + } + } + + return -1; +} + +///////////////////////////////////// +// Pywatcher funcs pywatcher_t *create_pywatcher(int zh, PyObject* cb, int permanent) { @@ -248,11 +310,16 @@ clientid_t cid; cid.client_id = -1; const char *passwd; + int handle = next_zhandle(); + if (handle == -1) { + resize_zhandles(); + handle = next_zhandle(); + } - if (num_zhandles >= MAX_ZHANDLES) { - PyErr_SetString( ZooKeeperException, "Too many ZooKeeper handles created, max is 256" ); - return NULL; - } + if (handle == -1) { + PyErr_SetString(ZooKeeperException,"Couldn't find a free zhandle, something is very wrong"); + return NULL; + } if (!PyArg_ParseTuple(args, "s|Oi(Ls)", &host, &watcherfn, &recv_timeout, &cid.client_id, &passwd)) return NULL; @@ -262,9 +329,9 @@ } pywatcher_t *pyw = NULL; if (watcherfn != Py_None) { - pyw = create_pywatcher(num_zhandles, watcherfn,1); + pyw = create_pywatcher(handle, watcherfn,1); } - watchers[num_zhandles] = pyw; + watchers[handle] = pyw; zhandle_t *zh = zookeeper_init( host, watcherfn != Py_None ? watcher_dispatch : NULL, recv_timeout, cid.client_id == -1 ? 0 : &cid, pyw, @@ -272,12 +339,12 @@ if (zh == NULL) { - PyErr_SetString( ZooKeeperException, "Unknown error" ); + PyErr_SetString( ZooKeeperException, "Could not internally obtain zookeeper handle" ); return NULL; } - zhandles[num_zhandles] = zh; - return Py_BuildValue( "i", num_zhandles++ ); + zhandles[handle] = zh; + return Py_BuildValue( "i", handle); } /////////////////////////////////////////////////////// @@ -1054,6 +1121,7 @@ PyMODINIT_FUNC initzookeeper() { PyEval_InitThreads(); PyObject *module = Py_InitModule("zookeeper", ZooKeeperMethods ); + init_zhandles(32); ZooKeeperException = PyErr_NewException("zookeeper.ZooKeeperException", PyExc_Exception, Modified: hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/test/connection_test.py URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/test/connection_test.py?rev=882327&r1=882326&r2=882327&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/test/connection_test.py (original) +++ hadoop/zookeeper/branches/branch-3.2/src/contrib/zkpython/src/test/connection_test.py Thu Nov 19 22:12:53 2009 @@ -19,6 +19,7 @@ import unittest, threading import zookeeper, zktestbase +ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"} class ConnectionTest(zktestbase.TestBase): """Test whether we can make a connection""" @@ -55,6 +56,69 @@ self.handle, "/") + def testhandlereuse(self): + """ + Test a) multiple concurrent connections b) reuse of closed handles + """ + cv = threading.Condition() + self.connected = False + def connection_watcher(handle, type, state, path): + cv.acquire() + self.connected = True + self.assertEqual(zookeeper.CONNECTED_STATE, state) + self.handle = handle + cv.notify() + cv.release() + + cv.acquire() + handles = [ zookeeper.init(self.host) for i in xrange(10) ] + ret = zookeeper.init(self.host, connection_watcher) + cv.wait(15.0) + cv.release() + self.assertEqual(self.connected, True, "Connection timed out to " + self.host) + self.assertEqual(True, all( [ zookeeper.state(handle) == zookeeper.CONNECTED_STATE for handle in handles ] ), + "Not all connections succeeded") + oldhandle = handles[3] + zookeeper.close(oldhandle) + newhandle = zookeeper.init(self.host) + + # This assertion tests *internal* behaviour; i.e. that the module + # correctly reuses closed handles. This is therefore implementation + # dependent. + self.assertEqual(newhandle, oldhandle, "Didn't get reused handle") + + def testmanyhandles(self): + """ + Test the ability of the module to support many handles. + """ + # We'd like to do more, but currently the C client doesn't + # work with > 83 handles (fails to create a pipe) on MacOS 10.5.8 + handles = [ zookeeper.init(self.host) for i in xrange(63) ] + + cv = threading.Condition() + self.connected = False + def connection_watcher(handle, type, state, path): + cv.acquire() + self.connected = True + self.assertEqual(zookeeper.CONNECTED_STATE, state) + self.handle = handle + cv.notify() + cv.release() + + cv.acquire() + ret = zookeeper.init(self.host, connection_watcher) + cv.wait(15.0) + cv.release() + self.assertEqual(self.connected, True, "Connection timed out to " + self.host) + + for i,h in enumerate(handles): + path = "/zkpython-test-handles-%s" % str(i) + self.assertEqual(path, zookeeper.create(h, path, "", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL)) + + self.assertEqual(True, all( zookeeper.close(h) == zookeeper.OK for h in handles )) + + + def tearDown(self): pass