Just to clarify, if you go and change test() into:
> def test(self):
> # now register a node
> self.register_node('/abc/a')
> self._zk.stop()
> self._zk.start()
> self.register_node('/abc/a')
> self._zk.get_children('/abc')
>
and then remove these lines from the state_handler() method:
>
>> for node in nodes.iteritems():
>> self.register_node(node[0], data=node[1])
>
then it works perfectly.. no hang, nothing. it seems that the register_node
cannot be called from within the state handler class. Why?
On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote:
> Hrmm here's a cleaner way to reproduce the issue:
>
> test.py:
>> from kazoo.client import KazooClient
>> from kazoo.client import KazooState
>> from kazoo.handlers.threading import TimeoutError
>> from kazoo.handlers.gevent import SequentialGeventHandler
>> import logging
>>
>>
>> class Test(object):
>> def __init__(self):
>> self.log = logging.getLogger()
>> format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s:
>> (%(levelname)s) %(message)s'
>> self.log.setLevel(logging.INFO)
>> formatter = logging.Formatter(format)
>> handler = logging.StreamHandler()
>> handler.setFormatter(formatter)
>> self.log.addHandler(handler)
>>
>> self.registered_nodes = {}
>>
>> self.log.setLevel(logging.DEBUG)
>>
>> self._zk = KazooClient(hosts='localhost:2182',
>> handler=SequentialGeventHandler())
>> self._zk.start()
>> self._zk.add_listener(self._state_listener)
>> self._state_listener(self._zk.state)
>>
>> def test(self):
>> # now register a node
>> self.register_node('/abc/a')
>> self._zk.stop()
>> self._zk.start()
>> self._zk.get_children('/abc')
>>
>> def register_node(self, node, data=None):
>> if node in self.registered_nodes:
>> if data == self.registered_nodes[node]:
>> self.log.debug('Already registered [%s] in data provider.' %
>> node)
>> return
>> self.log.debug('Registering [%s] in data provider.' % node)
>> self._zk.create(node, ephemeral=True, makepath=True)
>> self.registered_nodes[node] = data
>>
>>
>> def _state_listener(self,state):
>> self.log.warning('Zookeeper connection state changed: %s' % state)
>> if state == KazooState.SUSPENDED:
>> self.CONNECTION_STATE=False
>> elif state == KazooState.LOST:
>> self.CONNECTION_STATE=False
>> else:
>> self.CONNECTION_STATE=True
>> nodes = {}
>> print self.registered_nodes
>> try:
>> nodes = self.registered_nodes
>> except:
>> pass
>> self.registered_nodes = {}
>> for node in nodes.iteritems():
>> self.register_node(node[0], data=node[1])
>
> python
>>>> import test
>>>> k = test.Test()
>>>> k.test()
>
> (watch it hang ... )
>
>
> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:
>
>> I've got a weird connection issue playing around with Kazoo... If I do
>> something simple like:
>>
>>> k = KazooClient()
>>> k.start()
>>> k.create('/foo')
>>> k.stop()
>>> k.start()
>>> k.create('/foo')
>>
>> it works fine... the node is re-created, all is happy.
>>
>> However, if i try to use a state_listener callback to automatically
>> re-register any paths that we had registered on our first connection, it
>> fails. In fact, it doesn't really fail .. it hangs. This only happens if we
>> try to do re-register the paths from within the state listener. If we do it
>> outside of that callback (manually) it works fine. Silly code snippet that
>> will cause the problem:
>>
>>> from kazoo.client import KazooClient
>>> from kazoo.client import KazooState
>>> import logging
>>> log = logging.getLogger()
>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>> log.setLevel(logging.DEBUG)
>>> formatter = logging.Formatter(format)
>>> handler = logging.StreamHandler()
>>> handler.setFormatter(formatter)
>>> log.addHandler(handler)
>>>
>>> registered_nodes = {}
>>>
>>> def register_node(node, data=None):
>>> if node in registered_nodes:
>>> if data == registered_nodes[node]:
>>> log.debug('Already registered [%s] in data provider.' % node)
>>> return
>>> log.debug('Registering [%s] in data provider.' % node)
>>> _zk.create(node, ephemeral=True, makepath=True)
>>> registered_nodes[node] = data
>>>
>>> def _re_register_nodes(nodes):
>>> for node in nodes.iteritems():
>>> register_node(node[0], data=node[1])
>>>
>>> def _state_listener(state):
>>> log.warning('Zookeeper connection state changed: %s' % state)
>>> if state == KazooState.SUSPENDED:
>>> CONNECTION_STATE=False
>>> elif state == KazooState.LOST:
>>> CONNECTION_STATE=False
>>> else:
>>> CONNECTION_STATE=True
>>> try:
>>> nodes = registered_nodes
>>> except:
>>> # no local nodes
>>> registered_nodes = {}
>>> _re_register_nodes(nodes)
>>>
>>> log.setLevel(logging.DEBUG)
>>>
>>> registered_nodes = {}
>>>
>>> _zk = KazooClient(hosts='localhost:2182')
>>> _zk.start()
>>> _zk.add_listener(_state_listener)
>>> _state_listener(_zk.state)
>>>
>>> # now register a node
>>> register_node('/abc/a')
>>> _zk.stop()
>>> _zk.start()
>>
>>
>> If you run this in a python shell, after the _zk.start(), your path will not
>> reregister... instead, the _state_listener() method will basically hang when
>> it calls _re_register_nodes(). The _re_register_nodes() method hangs on the
>> first attempt to call register_node(), which will hang at the _zk.create.
>> Obviously the above code is a bastardized stripped down version of what
>> we're working with, but it replicates the problem. This could just be a
>> problem with my understanding of how the add_listener callback works.. but
>> I'm a bit confused here.
>>
>> Long term, the goal is to have our object able to handle a disconnect and
>> gracefully re-generate any paths that had been disconnected during the
>> connection loss. Ironically Kazoo handles this already with its 'watcher'
>> recipe. It just doesn't have the same kind of thing for any paths we create
>> with KazooClient.create().
>>
>> --Matt
>