Without really digging into this I'll toss in my initial observation.

Calling zk while still being inside a zk callback seems a bit dangerous.  I 
would have a queue and event thread and have work from the callbacks feed this 
queue which would be executed inside the event thread.


Regards,
Alan

On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:

> Just to clarify, if you go and change test() into:
> 
>>    def test(self):
>>        # now register a node
>>        self.register_node('/abc/a')
>>        self._zk.stop()
>>        self._zk.start()
>>        self.register_node('/abc/a')
>>        self._zk.get_children('/abc')
>> 
> 
> and then remove these lines from the state_handler() method:
>> 
>>>           for node in nodes.iteritems():
>>>               self.register_node(node[0], data=node[1])
>> 
> 
> then it works perfectly.. no hang, nothing. it seems that the register_node 
> cannot be called from within the state handler class. Why?
> 
> On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote:
> 
>> Hrmm here's a cleaner way to reproduce the issue:
>> 
>> test.py:
>>> from kazoo.client import KazooClient
>>> from kazoo.client import KazooState
>>> from kazoo.handlers.threading import TimeoutError
>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>> import logging
>>> 
>>> 
>>> class Test(object):
>>>   def __init__(self):
>>>       self.log = logging.getLogger()
>>>       format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: 
>>> (%(levelname)s) %(message)s'
>>>       self.log.setLevel(logging.INFO)
>>>       formatter = logging.Formatter(format)
>>>       handler = logging.StreamHandler()
>>>       handler.setFormatter(formatter)
>>>       self.log.addHandler(handler)
>>> 
>>>       self.registered_nodes = {}
>>> 
>>>       self.log.setLevel(logging.DEBUG)
>>> 
>>>       self._zk = KazooClient(hosts='localhost:2182', 
>>> handler=SequentialGeventHandler())
>>>       self._zk.start()
>>>       self._zk.add_listener(self._state_listener)
>>>       self._state_listener(self._zk.state)
>>> 
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self._zk.get_children('/abc')
>>> 
>>>   def register_node(self, node, data=None):
>>>       if node in self.registered_nodes:
>>>           if data == self.registered_nodes[node]:
>>>               self.log.debug('Already registered [%s] in data provider.' % 
>>> node)
>>>               return
>>>       self.log.debug('Registering [%s] in data provider.' % node)
>>>       self._zk.create(node, ephemeral=True, makepath=True)
>>>       self.registered_nodes[node] = data
>>> 
>>> 
>>>   def _state_listener(self,state):
>>>       self.log.warning('Zookeeper connection state changed: %s' % state)
>>>       if state == KazooState.SUSPENDED:
>>>           self.CONNECTION_STATE=False
>>>       elif state == KazooState.LOST:
>>>           self.CONNECTION_STATE=False
>>>       else:
>>>           self.CONNECTION_STATE=True
>>>           nodes = {}
>>>           print self.registered_nodes
>>>           try:
>>>               nodes = self.registered_nodes
>>>           except:
>>>               pass
>>>           self.registered_nodes = {}
>>>           for node in nodes.iteritems():
>>>               self.register_node(node[0], data=node[1])
>> 
>> python
>>>>> import test
>>>>> k = test.Test()
>>>>> k.test()
>> 
>> (watch it hang ... )
>> 
>> 
>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:
>> 
>>> I've got a weird connection issue playing around with Kazoo... If I do 
>>> something simple like:
>>> 
>>>> k = KazooClient()
>>>> k.start()
>>>> k.create('/foo')
>>>> k.stop()
>>>> k.start()
>>>> k.create('/foo')
>>> 
>>> it works fine... the node is re-created, all is happy.
>>> 
>>> However, if i try to use a state_listener callback to automatically 
>>> re-register any paths that we had registered on our first connection, it 
>>> fails. In fact, it doesn't really fail .. it hangs. This only happens if we 
>>> try to do re-register the paths from within the state listener. If we do it 
>>> outside of that callback (manually) it works fine. Silly code snippet that 
>>> will cause the problem:
>>> 
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> import logging
>>>> log = logging.getLogger()
>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>> log.setLevel(logging.DEBUG)
>>>> formatter = logging.Formatter(format)
>>>> handler = logging.StreamHandler()
>>>> handler.setFormatter(formatter)
>>>> log.addHandler(handler)
>>>> 
>>>> registered_nodes = {}
>>>> 
>>>> def register_node(node, data=None):
>>>>  if node in registered_nodes:
>>>>      if data == registered_nodes[node]:
>>>>          log.debug('Already registered [%s] in data provider.' % node)
>>>>          return
>>>>  log.debug('Registering [%s] in data provider.' % node)
>>>>  _zk.create(node, ephemeral=True, makepath=True)
>>>>  registered_nodes[node] = data
>>>> 
>>>> def _re_register_nodes(nodes):
>>>>  for node in nodes.iteritems():
>>>>      register_node(node[0], data=node[1])
>>>> 
>>>> def _state_listener(state):
>>>>  log.warning('Zookeeper connection state changed: %s' % state)
>>>>  if state == KazooState.SUSPENDED:
>>>>      CONNECTION_STATE=False
>>>>  elif state == KazooState.LOST:
>>>>      CONNECTION_STATE=False
>>>>  else:
>>>>      CONNECTION_STATE=True
>>>>      try:
>>>>          nodes = registered_nodes
>>>>      except:
>>>>          # no local nodes
>>>>      registered_nodes = {}
>>>>      _re_register_nodes(nodes)
>>>> 
>>>> log.setLevel(logging.DEBUG) 
>>>> 
>>>> registered_nodes = {}
>>>> 
>>>> _zk = KazooClient(hosts='localhost:2182')
>>>> _zk.start()
>>>> _zk.add_listener(_state_listener)
>>>> _state_listener(_zk.state)
>>>> 
>>>> # now register a node
>>>> register_node('/abc/a')
>>>> _zk.stop()
>>>> _zk.start()
>>> 
>>> 
>>> If you run this in a python shell, after the _zk.start(), your path will 
>>> not reregister... instead, the _state_listener() method will basically hang 
>>> when it calls _re_register_nodes(). The _re_register_nodes() method hangs 
>>> on the first attempt to call register_node(), which will hang at the 
>>> _zk.create. Obviously the above code is a bastardized stripped down version 
>>> of what we're working with, but it replicates the problem. This could just 
>>> be a problem with my understanding of how the add_listener callback works.. 
>>> but I'm a bit confused here.
>>> 
>>> Long term, the goal is to have our object able to handle a disconnect and 
>>> gracefully re-generate any paths that had been disconnected during the 
>>> connection loss. Ironically Kazoo handles this already with its 'watcher' 
>>> recipe. It just doesn't have the same kind of thing for any paths we create 
>>> with KazooClient.create().
>>> 
>>> --Matt
>> 
> 

Reply via email to