Hrmm here's a cleaner way to reproduce the issue:
test.py:
> from kazoo.client import KazooClient
> from kazoo.client import KazooState
> from kazoo.handlers.threading import TimeoutError
> from kazoo.handlers.gevent import SequentialGeventHandler
> import logging
>
>
> class Test(object):
> def __init__(self):
> self.log = logging.getLogger()
> format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s:
> (%(levelname)s) %(message)s'
> self.log.setLevel(logging.INFO)
> formatter = logging.Formatter(format)
> handler = logging.StreamHandler()
> handler.setFormatter(formatter)
> self.log.addHandler(handler)
>
> self.registered_nodes = {}
>
> self.log.setLevel(logging.DEBUG)
>
> self._zk = KazooClient(hosts='localhost:2182',
> handler=SequentialGeventHandler())
> self._zk.start()
> self._zk.add_listener(self._state_listener)
> self._state_listener(self._zk.state)
>
> def test(self):
> # now register a node
> self.register_node('/abc/a')
> self._zk.stop()
> self._zk.start()
> self._zk.get_children('/abc')
>
> def register_node(self, node, data=None):
> if node in self.registered_nodes:
> if data == self.registered_nodes[node]:
> self.log.debug('Already registered [%s] in data provider.' %
> node)
> return
> self.log.debug('Registering [%s] in data provider.' % node)
> self._zk.create(node, ephemeral=True, makepath=True)
> self.registered_nodes[node] = data
>
>
> def _state_listener(self,state):
> self.log.warning('Zookeeper connection state changed: %s' % state)
> if state == KazooState.SUSPENDED:
> self.CONNECTION_STATE=False
> elif state == KazooState.LOST:
> self.CONNECTION_STATE=False
> else:
> self.CONNECTION_STATE=True
> nodes = {}
> print self.registered_nodes
> try:
> nodes = self.registered_nodes
> except:
> pass
> self.registered_nodes = {}
> for node in nodes.iteritems():
> self.register_node(node[0], data=node[1])
python
>>> import test
>>> k = test.Test()
>>> k.test()
(watch it hang ... )
On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:
> I've got a weird connection issue playing around with Kazoo... If I do
> something simple like:
>
>> k = KazooClient()
>> k.start()
>> k.create('/foo')
>> k.stop()
>> k.start()
>> k.create('/foo')
>
> it works fine... the node is re-created, all is happy.
>
> However, if i try to use a state_listener callback to automatically
> re-register any paths that we had registered on our first connection, it
> fails. In fact, it doesn't really fail .. it hangs. This only happens if we
> try to do re-register the paths from within the state listener. If we do it
> outside of that callback (manually) it works fine. Silly code snippet that
> will cause the problem:
>
>> from kazoo.client import KazooClient
>> from kazoo.client import KazooState
>> import logging
>> log = logging.getLogger()
>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>> log.setLevel(logging.DEBUG)
>> formatter = logging.Formatter(format)
>> handler = logging.StreamHandler()
>> handler.setFormatter(formatter)
>> log.addHandler(handler)
>>
>> registered_nodes = {}
>>
>> def register_node(node, data=None):
>> if node in registered_nodes:
>> if data == registered_nodes[node]:
>> log.debug('Already registered [%s] in data provider.' % node)
>> return
>> log.debug('Registering [%s] in data provider.' % node)
>> _zk.create(node, ephemeral=True, makepath=True)
>> registered_nodes[node] = data
>>
>> def _re_register_nodes(nodes):
>> for node in nodes.iteritems():
>> register_node(node[0], data=node[1])
>>
>> def _state_listener(state):
>> log.warning('Zookeeper connection state changed: %s' % state)
>> if state == KazooState.SUSPENDED:
>> CONNECTION_STATE=False
>> elif state == KazooState.LOST:
>> CONNECTION_STATE=False
>> else:
>> CONNECTION_STATE=True
>> try:
>> nodes = registered_nodes
>> except:
>> # no local nodes
>> registered_nodes = {}
>> _re_register_nodes(nodes)
>>
>> log.setLevel(logging.DEBUG)
>>
>> registered_nodes = {}
>>
>> _zk = KazooClient(hosts='localhost:2182')
>> _zk.start()
>> _zk.add_listener(_state_listener)
>> _state_listener(_zk.state)
>>
>> # now register a node
>> register_node('/abc/a')
>> _zk.stop()
>> _zk.start()
>
>
> If you run this in a python shell, after the _zk.start(), your path will not
> reregister... instead, the _state_listener() method will basically hang when
> it calls _re_register_nodes(). The _re_register_nodes() method hangs on the
> first attempt to call register_node(), which will hang at the _zk.create.
> Obviously the above code is a bastardized stripped down version of what we're
> working with, but it replicates the problem. This could just be a problem
> with my understanding of how the add_listener callback works.. but I'm a bit
> confused here.
>
> Long term, the goal is to have our object able to handle a disconnect and
> gracefully re-generate any paths that had been disconnected during the
> connection loss. Ironically Kazoo handles this already with its 'watcher'
> recipe. It just doesn't have the same kind of thing for any paths we create
> with KazooClient.create().
>
> --Matt