On Dec 21, 2012, at 10:00 PM, Matt Wise wrote: > I just want to circle around on this issue.. The root cause was that the > Kazoo thread triggering the 'state listener' callback does so with a lock > that it does not release until after the callbacks are finished. However, if > your callback tries to use this thread to make a Zookeeper call (ie, get, > set, create, delete), it waits on that lock and immediately causes a deadlock.
Ahh, calling user callbacks while holding a lock. That would do it. :) > To handle this scenario they have a 'spawn()' function in the Zookeeper > thread handler that you can use: > >> self._zk.handler.spawn(self._re_establish_registrations) > > This spawns a thread and immediately returns, allowing the Zookeeper thread > to finish its callbacks and release the lock. Thanks a ton to Ben B. for > helping me track that down. Yeah, Ben is awesome. Regards, Alan > On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[email protected]> wrote: > >> Without really digging into this I'll toss in my initial observation. >> >> Calling zk while still being inside a zk callback seems a bit dangerous. I >> would have a queue and event thread and have work from the callbacks feed >> this queue which would be executed inside the event thread. >> >> >> Regards, >> Alan >> >> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote: >> >>> Just to clarify, if you go and change test() into: >>> >>>> def test(self): >>>> # now register a node >>>> self.register_node('/abc/a') >>>> self._zk.stop() >>>> self._zk.start() >>>> self.register_node('/abc/a') >>>> self._zk.get_children('/abc') >>>> >>> >>> and then remove these lines from the state_handler() method: >>>> >>>>> for node in nodes.iteritems(): >>>>> self.register_node(node[0], data=node[1]) >>>> >>> >>> then it works perfectly.. no hang, nothing. it seems that the register_node >>> cannot be called from within the state handler class. Why? >>> >>> On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote: >>> >>>> Hrmm here's a cleaner way to reproduce the issue: >>>> >>>> test.py: >>>>> from kazoo.client import KazooClient >>>>> from kazoo.client import KazooState >>>>> from kazoo.handlers.threading import TimeoutError >>>>> from kazoo.handlers.gevent import SequentialGeventHandler >>>>> import logging >>>>> >>>>> >>>>> class Test(object): >>>>> def __init__(self): >>>>> self.log = logging.getLogger() >>>>> format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: >>>>> (%(levelname)s) %(message)s' >>>>> self.log.setLevel(logging.INFO) >>>>> formatter = logging.Formatter(format) >>>>> handler = logging.StreamHandler() >>>>> handler.setFormatter(formatter) >>>>> self.log.addHandler(handler) >>>>> >>>>> self.registered_nodes = {} >>>>> >>>>> self.log.setLevel(logging.DEBUG) >>>>> >>>>> self._zk = KazooClient(hosts='localhost:2182', >>>>> handler=SequentialGeventHandler()) >>>>> self._zk.start() >>>>> self._zk.add_listener(self._state_listener) >>>>> self._state_listener(self._zk.state) >>>>> >>>>> def test(self): >>>>> # now register a node >>>>> self.register_node('/abc/a') >>>>> self._zk.stop() >>>>> self._zk.start() >>>>> self._zk.get_children('/abc') >>>>> >>>>> def register_node(self, node, data=None): >>>>> if node in self.registered_nodes: >>>>> if data == self.registered_nodes[node]: >>>>> self.log.debug('Already registered [%s] in data provider.' % >>>>> node) >>>>> return >>>>> self.log.debug('Registering [%s] in data provider.' % node) >>>>> self._zk.create(node, ephemeral=True, makepath=True) >>>>> self.registered_nodes[node] = data >>>>> >>>>> >>>>> def _state_listener(self,state): >>>>> self.log.warning('Zookeeper connection state changed: %s' % state) >>>>> if state == KazooState.SUSPENDED: >>>>> self.CONNECTION_STATE=False >>>>> elif state == KazooState.LOST: >>>>> self.CONNECTION_STATE=False >>>>> else: >>>>> self.CONNECTION_STATE=True >>>>> nodes = {} >>>>> print self.registered_nodes >>>>> try: >>>>> nodes = self.registered_nodes >>>>> except: >>>>> pass >>>>> self.registered_nodes = {} >>>>> for node in nodes.iteritems(): >>>>> self.register_node(node[0], data=node[1]) >>>> >>>> python >>>>>>> import test >>>>>>> k = test.Test() >>>>>>> k.test() >>>> >>>> (watch it hang ... ) >>>> >>>> >>>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote: >>>> >>>>> I've got a weird connection issue playing around with Kazoo... If I do >>>>> something simple like: >>>>> >>>>>> k = KazooClient() >>>>>> k.start() >>>>>> k.create('/foo') >>>>>> k.stop() >>>>>> k.start() >>>>>> k.create('/foo') >>>>> >>>>> it works fine... the node is re-created, all is happy. >>>>> >>>>> However, if i try to use a state_listener callback to automatically >>>>> re-register any paths that we had registered on our first connection, it >>>>> fails. In fact, it doesn't really fail .. it hangs. This only happens if >>>>> we try to do re-register the paths from within the state listener. If we >>>>> do it outside of that callback (manually) it works fine. Silly code >>>>> snippet that will cause the problem: >>>>> >>>>>> from kazoo.client import KazooClient >>>>>> from kazoo.client import KazooState >>>>>> import logging >>>>>> log = logging.getLogger() >>>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s' >>>>>> log.setLevel(logging.DEBUG) >>>>>> formatter = logging.Formatter(format) >>>>>> handler = logging.StreamHandler() >>>>>> handler.setFormatter(formatter) >>>>>> log.addHandler(handler) >>>>>> >>>>>> registered_nodes = {} >>>>>> >>>>>> def register_node(node, data=None): >>>>>> if node in registered_nodes: >>>>>> if data == registered_nodes[node]: >>>>>> log.debug('Already registered [%s] in data provider.' % node) >>>>>> return >>>>>> log.debug('Registering [%s] in data provider.' % node) >>>>>> _zk.create(node, ephemeral=True, makepath=True) >>>>>> registered_nodes[node] = data >>>>>> >>>>>> def _re_register_nodes(nodes): >>>>>> for node in nodes.iteritems(): >>>>>> register_node(node[0], data=node[1]) >>>>>> >>>>>> def _state_listener(state): >>>>>> log.warning('Zookeeper connection state changed: %s' % state) >>>>>> if state == KazooState.SUSPENDED: >>>>>> CONNECTION_STATE=False >>>>>> elif state == KazooState.LOST: >>>>>> CONNECTION_STATE=False >>>>>> else: >>>>>> CONNECTION_STATE=True >>>>>> try: >>>>>> nodes = registered_nodes >>>>>> except: >>>>>> # no local nodes >>>>>> registered_nodes = {} >>>>>> _re_register_nodes(nodes) >>>>>> >>>>>> log.setLevel(logging.DEBUG) >>>>>> >>>>>> registered_nodes = {} >>>>>> >>>>>> _zk = KazooClient(hosts='localhost:2182') >>>>>> _zk.start() >>>>>> _zk.add_listener(_state_listener) >>>>>> _state_listener(_zk.state) >>>>>> >>>>>> # now register a node >>>>>> register_node('/abc/a') >>>>>> _zk.stop() >>>>>> _zk.start() >>>>> >>>>> >>>>> If you run this in a python shell, after the _zk.start(), your path will >>>>> not reregister... instead, the _state_listener() method will basically >>>>> hang when it calls _re_register_nodes(). The _re_register_nodes() method >>>>> hangs on the first attempt to call register_node(), which will hang at >>>>> the _zk.create. Obviously the above code is a bastardized stripped down >>>>> version of what we're working with, but it replicates the problem. This >>>>> could just be a problem with my understanding of how the add_listener >>>>> callback works.. but I'm a bit confused here. >>>>> >>>>> Long term, the goal is to have our object able to handle a disconnect and >>>>> gracefully re-generate any paths that had been disconnected during the >>>>> connection loss. Ironically Kazoo handles this already with its 'watcher' >>>>> recipe. It just doesn't have the same kind of thing for any paths we >>>>> create with KazooClient.create(). >>>>> >>>>> --Matt >>>> >>> >> >
