Hrmm here's a cleaner way to reproduce the issue:

test.py:
> from kazoo.client import KazooClient
> from kazoo.client import KazooState
> from kazoo.handlers.threading import TimeoutError
> from kazoo.handlers.gevent import SequentialGeventHandler
> import logging
> 
> 
> class Test(object):
>     def __init__(self):
>         self.log = logging.getLogger()
>         format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: 
> (%(levelname)s) %(message)s'
>         self.log.setLevel(logging.INFO)
>         formatter = logging.Formatter(format)
>         handler = logging.StreamHandler()
>         handler.setFormatter(formatter)
>         self.log.addHandler(handler)
> 
>         self.registered_nodes = {}
> 
>         self.log.setLevel(logging.DEBUG)
> 
>         self._zk = KazooClient(hosts='localhost:2182', 
> handler=SequentialGeventHandler())
>         self._zk.start()
>         self._zk.add_listener(self._state_listener)
>         self._state_listener(self._zk.state)
> 
>     def test(self):
>         # now register a node
>         self.register_node('/abc/a')
>         self._zk.stop()
>         self._zk.start()
>         self._zk.get_children('/abc')
> 
>     def register_node(self, node, data=None):
>         if node in self.registered_nodes:
>             if data == self.registered_nodes[node]:
>                 self.log.debug('Already registered [%s] in data provider.' % 
> node)
>                 return
>         self.log.debug('Registering [%s] in data provider.' % node)
>         self._zk.create(node, ephemeral=True, makepath=True)
>         self.registered_nodes[node] = data
> 
> 
>     def _state_listener(self,state):
>         self.log.warning('Zookeeper connection state changed: %s' % state)
>         if state == KazooState.SUSPENDED:
>             self.CONNECTION_STATE=False
>         elif state == KazooState.LOST:
>             self.CONNECTION_STATE=False
>         else:
>             self.CONNECTION_STATE=True
>             nodes = {}
>             print self.registered_nodes
>             try:
>                 nodes = self.registered_nodes
>             except:
>                 pass
>             self.registered_nodes = {}
>             for node in nodes.iteritems():
>                 self.register_node(node[0], data=node[1])

python
>>> import test
>>> k = test.Test()
>>> k.test()

(watch it hang ... )


On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:

> I've got a weird connection issue playing around with Kazoo... If I do 
> something simple like:
> 
>> k = KazooClient()
>> k.start()
>> k.create('/foo')
>> k.stop()
>> k.start()
>> k.create('/foo')
> 
> it works fine... the node is re-created, all is happy.
> 
> However, if i try to use a state_listener callback to automatically 
> re-register any paths that we had registered on our first connection, it 
> fails. In fact, it doesn't really fail .. it hangs. This only happens if we 
> try to do re-register the paths from within the state listener. If we do it 
> outside of that callback (manually) it works fine. Silly code snippet that 
> will cause the problem:
> 
>> from kazoo.client import KazooClient
>> from kazoo.client import KazooState
>> import logging
>> log = logging.getLogger()
>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>> log.setLevel(logging.DEBUG)
>> formatter = logging.Formatter(format)
>> handler = logging.StreamHandler()
>> handler.setFormatter(formatter)
>> log.addHandler(handler)
>> 
>> registered_nodes = {}
>> 
>> def register_node(node, data=None):
>>    if node in registered_nodes:
>>        if data == registered_nodes[node]:
>>            log.debug('Already registered [%s] in data provider.' % node)
>>            return
>>    log.debug('Registering [%s] in data provider.' % node)
>>    _zk.create(node, ephemeral=True, makepath=True)
>>    registered_nodes[node] = data
>> 
>> def _re_register_nodes(nodes):
>>    for node in nodes.iteritems():
>>        register_node(node[0], data=node[1])
>> 
>> def _state_listener(state):
>>    log.warning('Zookeeper connection state changed: %s' % state)
>>    if state == KazooState.SUSPENDED:
>>        CONNECTION_STATE=False
>>    elif state == KazooState.LOST:
>>        CONNECTION_STATE=False
>>    else:
>>        CONNECTION_STATE=True
>>        try:
>>            nodes = registered_nodes
>>        except:
>>            # no local nodes
>>        registered_nodes = {}
>>        _re_register_nodes(nodes)
>> 
>> log.setLevel(logging.DEBUG) 
>> 
>> registered_nodes = {}
>> 
>> _zk = KazooClient(hosts='localhost:2182')
>> _zk.start()
>> _zk.add_listener(_state_listener)
>> _state_listener(_zk.state)
>> 
>> # now register a node
>> register_node('/abc/a')
>> _zk.stop()
>> _zk.start()
> 
> 
> If you run this in a python shell, after the _zk.start(), your path will not 
> reregister... instead, the _state_listener() method will basically hang when 
> it calls _re_register_nodes(). The _re_register_nodes() method hangs on the 
> first attempt to call register_node(), which will hang at the _zk.create. 
> Obviously the above code is a bastardized stripped down version of what we're 
> working with, but it replicates the problem. This could just be a problem 
> with my understanding of how the add_listener callback works.. but I'm a bit 
> confused here.
> 
> Long term, the goal is to have our object able to handle a disconnect and 
> gracefully re-generate any paths that had been disconnected during the 
> connection loss. Ironically Kazoo handles this already with its 'watcher' 
> recipe. It just doesn't have the same kind of thing for any paths we create 
> with KazooClient.create().
> 
> --Matt

Reply via email to