Without really digging into this I'll toss in my initial observation.
Calling zk while still being inside a zk callback seems a bit dangerous. I
would have a queue and event thread and have work from the callbacks feed this
queue which would be executed inside the event thread.
Regards,
Alan
On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
> Just to clarify, if you go and change test() into:
>
>> def test(self):
>> # now register a node
>> self.register_node('/abc/a')
>> self._zk.stop()
>> self._zk.start()
>> self.register_node('/abc/a')
>> self._zk.get_children('/abc')
>>
>
> and then remove these lines from the state_handler() method:
>>
>>> for node in nodes.iteritems():
>>> self.register_node(node[0], data=node[1])
>>
>
> then it works perfectly.. no hang, nothing. it seems that the register_node
> cannot be called from within the state handler class. Why?
>
> On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote:
>
>> Hrmm here's a cleaner way to reproduce the issue:
>>
>> test.py:
>>> from kazoo.client import KazooClient
>>> from kazoo.client import KazooState
>>> from kazoo.handlers.threading import TimeoutError
>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>> import logging
>>>
>>>
>>> class Test(object):
>>> def __init__(self):
>>> self.log = logging.getLogger()
>>> format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s:
>>> (%(levelname)s) %(message)s'
>>> self.log.setLevel(logging.INFO)
>>> formatter = logging.Formatter(format)
>>> handler = logging.StreamHandler()
>>> handler.setFormatter(formatter)
>>> self.log.addHandler(handler)
>>>
>>> self.registered_nodes = {}
>>>
>>> self.log.setLevel(logging.DEBUG)
>>>
>>> self._zk = KazooClient(hosts='localhost:2182',
>>> handler=SequentialGeventHandler())
>>> self._zk.start()
>>> self._zk.add_listener(self._state_listener)
>>> self._state_listener(self._zk.state)
>>>
>>> def test(self):
>>> # now register a node
>>> self.register_node('/abc/a')
>>> self._zk.stop()
>>> self._zk.start()
>>> self._zk.get_children('/abc')
>>>
>>> def register_node(self, node, data=None):
>>> if node in self.registered_nodes:
>>> if data == self.registered_nodes[node]:
>>> self.log.debug('Already registered [%s] in data provider.' %
>>> node)
>>> return
>>> self.log.debug('Registering [%s] in data provider.' % node)
>>> self._zk.create(node, ephemeral=True, makepath=True)
>>> self.registered_nodes[node] = data
>>>
>>>
>>> def _state_listener(self,state):
>>> self.log.warning('Zookeeper connection state changed: %s' % state)
>>> if state == KazooState.SUSPENDED:
>>> self.CONNECTION_STATE=False
>>> elif state == KazooState.LOST:
>>> self.CONNECTION_STATE=False
>>> else:
>>> self.CONNECTION_STATE=True
>>> nodes = {}
>>> print self.registered_nodes
>>> try:
>>> nodes = self.registered_nodes
>>> except:
>>> pass
>>> self.registered_nodes = {}
>>> for node in nodes.iteritems():
>>> self.register_node(node[0], data=node[1])
>>
>> python
>>>>> import test
>>>>> k = test.Test()
>>>>> k.test()
>>
>> (watch it hang ... )
>>
>>
>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:
>>
>>> I've got a weird connection issue playing around with Kazoo... If I do
>>> something simple like:
>>>
>>>> k = KazooClient()
>>>> k.start()
>>>> k.create('/foo')
>>>> k.stop()
>>>> k.start()
>>>> k.create('/foo')
>>>
>>> it works fine... the node is re-created, all is happy.
>>>
>>> However, if i try to use a state_listener callback to automatically
>>> re-register any paths that we had registered on our first connection, it
>>> fails. In fact, it doesn't really fail .. it hangs. This only happens if we
>>> try to do re-register the paths from within the state listener. If we do it
>>> outside of that callback (manually) it works fine. Silly code snippet that
>>> will cause the problem:
>>>
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> import logging
>>>> log = logging.getLogger()
>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>> log.setLevel(logging.DEBUG)
>>>> formatter = logging.Formatter(format)
>>>> handler = logging.StreamHandler()
>>>> handler.setFormatter(formatter)
>>>> log.addHandler(handler)
>>>>
>>>> registered_nodes = {}
>>>>
>>>> def register_node(node, data=None):
>>>> if node in registered_nodes:
>>>> if data == registered_nodes[node]:
>>>> log.debug('Already registered [%s] in data provider.' % node)
>>>> return
>>>> log.debug('Registering [%s] in data provider.' % node)
>>>> _zk.create(node, ephemeral=True, makepath=True)
>>>> registered_nodes[node] = data
>>>>
>>>> def _re_register_nodes(nodes):
>>>> for node in nodes.iteritems():
>>>> register_node(node[0], data=node[1])
>>>>
>>>> def _state_listener(state):
>>>> log.warning('Zookeeper connection state changed: %s' % state)
>>>> if state == KazooState.SUSPENDED:
>>>> CONNECTION_STATE=False
>>>> elif state == KazooState.LOST:
>>>> CONNECTION_STATE=False
>>>> else:
>>>> CONNECTION_STATE=True
>>>> try:
>>>> nodes = registered_nodes
>>>> except:
>>>> # no local nodes
>>>> registered_nodes = {}
>>>> _re_register_nodes(nodes)
>>>>
>>>> log.setLevel(logging.DEBUG)
>>>>
>>>> registered_nodes = {}
>>>>
>>>> _zk = KazooClient(hosts='localhost:2182')
>>>> _zk.start()
>>>> _zk.add_listener(_state_listener)
>>>> _state_listener(_zk.state)
>>>>
>>>> # now register a node
>>>> register_node('/abc/a')
>>>> _zk.stop()
>>>> _zk.start()
>>>
>>>
>>> If you run this in a python shell, after the _zk.start(), your path will
>>> not reregister... instead, the _state_listener() method will basically hang
>>> when it calls _re_register_nodes(). The _re_register_nodes() method hangs
>>> on the first attempt to call register_node(), which will hang at the
>>> _zk.create. Obviously the above code is a bastardized stripped down version
>>> of what we're working with, but it replicates the problem. This could just
>>> be a problem with my understanding of how the add_listener callback works..
>>> but I'm a bit confused here.
>>>
>>> Long term, the goal is to have our object able to handle a disconnect and
>>> gracefully re-generate any paths that had been disconnected during the
>>> connection loss. Ironically Kazoo handles this already with its 'watcher'
>>> recipe. It just doesn't have the same kind of thing for any paths we create
>>> with KazooClient.create().
>>>
>>> --Matt
>>
>