I just want to circle around on this issue.. The root cause was that the Kazoo thread triggering the 'state listener' callback does so with a lock that it does not release until after the callbacks are finished. However, if your callback tries to use this thread to make a Zookeeper call (ie, get, set, create, delete), it waits on that lock and immediately causes a deadlock.
To handle this scenario they have a 'spawn()' function in the Zookeeper thread handler that you can use: > self._zk.handler.spawn(self._re_establish_registrations) This spawns a thread and immediately returns, allowing the Zookeeper thread to finish its callbacks and release the lock. Thanks a ton to Ben B. for helping me track that down. --Matt On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[email protected]> wrote: > Without really digging into this I'll toss in my initial observation. > > Calling zk while still being inside a zk callback seems a bit dangerous. I > would have a queue and event thread and have work from the callbacks feed > this queue which would be executed inside the event thread. > > > Regards, > Alan > > On Dec 9, 2012, at 2:30 PM, Matt Wise wrote: > >> Just to clarify, if you go and change test() into: >> >>> def test(self): >>> # now register a node >>> self.register_node('/abc/a') >>> self._zk.stop() >>> self._zk.start() >>> self.register_node('/abc/a') >>> self._zk.get_children('/abc') >>> >> >> and then remove these lines from the state_handler() method: >>> >>>> for node in nodes.iteritems(): >>>> self.register_node(node[0], data=node[1]) >>> >> >> then it works perfectly.. no hang, nothing. it seems that the register_node >> cannot be called from within the state handler class. Why? >> >> On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote: >> >>> Hrmm here's a cleaner way to reproduce the issue: >>> >>> test.py: >>>> from kazoo.client import KazooClient >>>> from kazoo.client import KazooState >>>> from kazoo.handlers.threading import TimeoutError >>>> from kazoo.handlers.gevent import SequentialGeventHandler >>>> import logging >>>> >>>> >>>> class Test(object): >>>> def __init__(self): >>>> self.log = logging.getLogger() >>>> format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: >>>> (%(levelname)s) %(message)s' >>>> self.log.setLevel(logging.INFO) >>>> formatter = logging.Formatter(format) >>>> handler = logging.StreamHandler() >>>> handler.setFormatter(formatter) >>>> self.log.addHandler(handler) >>>> >>>> self.registered_nodes = {} >>>> >>>> self.log.setLevel(logging.DEBUG) >>>> >>>> self._zk = KazooClient(hosts='localhost:2182', >>>> handler=SequentialGeventHandler()) >>>> self._zk.start() >>>> self._zk.add_listener(self._state_listener) >>>> self._state_listener(self._zk.state) >>>> >>>> def test(self): >>>> # now register a node >>>> self.register_node('/abc/a') >>>> self._zk.stop() >>>> self._zk.start() >>>> self._zk.get_children('/abc') >>>> >>>> def register_node(self, node, data=None): >>>> if node in self.registered_nodes: >>>> if data == self.registered_nodes[node]: >>>> self.log.debug('Already registered [%s] in data provider.' % >>>> node) >>>> return >>>> self.log.debug('Registering [%s] in data provider.' % node) >>>> self._zk.create(node, ephemeral=True, makepath=True) >>>> self.registered_nodes[node] = data >>>> >>>> >>>> def _state_listener(self,state): >>>> self.log.warning('Zookeeper connection state changed: %s' % state) >>>> if state == KazooState.SUSPENDED: >>>> self.CONNECTION_STATE=False >>>> elif state == KazooState.LOST: >>>> self.CONNECTION_STATE=False >>>> else: >>>> self.CONNECTION_STATE=True >>>> nodes = {} >>>> print self.registered_nodes >>>> try: >>>> nodes = self.registered_nodes >>>> except: >>>> pass >>>> self.registered_nodes = {} >>>> for node in nodes.iteritems(): >>>> self.register_node(node[0], data=node[1]) >>> >>> python >>>>>> import test >>>>>> k = test.Test() >>>>>> k.test() >>> >>> (watch it hang ... ) >>> >>> >>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote: >>> >>>> I've got a weird connection issue playing around with Kazoo... If I do >>>> something simple like: >>>> >>>>> k = KazooClient() >>>>> k.start() >>>>> k.create('/foo') >>>>> k.stop() >>>>> k.start() >>>>> k.create('/foo') >>>> >>>> it works fine... the node is re-created, all is happy. >>>> >>>> However, if i try to use a state_listener callback to automatically >>>> re-register any paths that we had registered on our first connection, it >>>> fails. In fact, it doesn't really fail .. it hangs. This only happens if >>>> we try to do re-register the paths from within the state listener. If we >>>> do it outside of that callback (manually) it works fine. Silly code >>>> snippet that will cause the problem: >>>> >>>>> from kazoo.client import KazooClient >>>>> from kazoo.client import KazooState >>>>> import logging >>>>> log = logging.getLogger() >>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s' >>>>> log.setLevel(logging.DEBUG) >>>>> formatter = logging.Formatter(format) >>>>> handler = logging.StreamHandler() >>>>> handler.setFormatter(formatter) >>>>> log.addHandler(handler) >>>>> >>>>> registered_nodes = {} >>>>> >>>>> def register_node(node, data=None): >>>>> if node in registered_nodes: >>>>> if data == registered_nodes[node]: >>>>> log.debug('Already registered [%s] in data provider.' % node) >>>>> return >>>>> log.debug('Registering [%s] in data provider.' % node) >>>>> _zk.create(node, ephemeral=True, makepath=True) >>>>> registered_nodes[node] = data >>>>> >>>>> def _re_register_nodes(nodes): >>>>> for node in nodes.iteritems(): >>>>> register_node(node[0], data=node[1]) >>>>> >>>>> def _state_listener(state): >>>>> log.warning('Zookeeper connection state changed: %s' % state) >>>>> if state == KazooState.SUSPENDED: >>>>> CONNECTION_STATE=False >>>>> elif state == KazooState.LOST: >>>>> CONNECTION_STATE=False >>>>> else: >>>>> CONNECTION_STATE=True >>>>> try: >>>>> nodes = registered_nodes >>>>> except: >>>>> # no local nodes >>>>> registered_nodes = {} >>>>> _re_register_nodes(nodes) >>>>> >>>>> log.setLevel(logging.DEBUG) >>>>> >>>>> registered_nodes = {} >>>>> >>>>> _zk = KazooClient(hosts='localhost:2182') >>>>> _zk.start() >>>>> _zk.add_listener(_state_listener) >>>>> _state_listener(_zk.state) >>>>> >>>>> # now register a node >>>>> register_node('/abc/a') >>>>> _zk.stop() >>>>> _zk.start() >>>> >>>> >>>> If you run this in a python shell, after the _zk.start(), your path will >>>> not reregister... instead, the _state_listener() method will basically >>>> hang when it calls _re_register_nodes(). The _re_register_nodes() method >>>> hangs on the first attempt to call register_node(), which will hang at the >>>> _zk.create. Obviously the above code is a bastardized stripped down >>>> version of what we're working with, but it replicates the problem. This >>>> could just be a problem with my understanding of how the add_listener >>>> callback works.. but I'm a bit confused here. >>>> >>>> Long term, the goal is to have our object able to handle a disconnect and >>>> gracefully re-generate any paths that had been disconnected during the >>>> connection loss. Ironically Kazoo handles this already with its 'watcher' >>>> recipe. It just doesn't have the same kind of thing for any paths we >>>> create with KazooClient.create(). >>>> >>>> --Matt >>> >> >
