I just want to circle around on this issue.. The root cause was that the Kazoo 
thread triggering the 'state listener' callback does so with a lock that it 
does not release until after the callbacks are finished. However, if your 
callback tries to use this thread to make a Zookeeper call (ie, get, set, 
create, delete), it waits on that lock and immediately causes a deadlock.

To handle this scenario they have a 'spawn()' function in the Zookeeper thread 
handler that you can use:

>             self._zk.handler.spawn(self._re_establish_registrations)

This spawns a thread and immediately returns, allowing the Zookeeper thread to 
finish its callbacks and release the lock. Thanks a ton to Ben B. for helping 
me track that down.

--Matt

On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[email protected]> wrote:

> Without really digging into this I'll toss in my initial observation.
> 
> Calling zk while still being inside a zk callback seems a bit dangerous.  I 
> would have a queue and event thread and have work from the callbacks feed 
> this queue which would be executed inside the event thread.
> 
> 
> Regards,
> Alan
> 
> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
> 
>> Just to clarify, if you go and change test() into:
>> 
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self.register_node('/abc/a')
>>>       self._zk.get_children('/abc')
>>> 
>> 
>> and then remove these lines from the state_handler() method:
>>> 
>>>>          for node in nodes.iteritems():
>>>>              self.register_node(node[0], data=node[1])
>>> 
>> 
>> then it works perfectly.. no hang, nothing. it seems that the register_node 
>> cannot be called from within the state handler class. Why?
>> 
>> On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote:
>> 
>>> Hrmm here's a cleaner way to reproduce the issue:
>>> 
>>> test.py:
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> from kazoo.handlers.threading import TimeoutError
>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>> import logging
>>>> 
>>>> 
>>>> class Test(object):
>>>>  def __init__(self):
>>>>      self.log = logging.getLogger()
>>>>      format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: 
>>>> (%(levelname)s) %(message)s'
>>>>      self.log.setLevel(logging.INFO)
>>>>      formatter = logging.Formatter(format)
>>>>      handler = logging.StreamHandler()
>>>>      handler.setFormatter(formatter)
>>>>      self.log.addHandler(handler)
>>>> 
>>>>      self.registered_nodes = {}
>>>> 
>>>>      self.log.setLevel(logging.DEBUG)
>>>> 
>>>>      self._zk = KazooClient(hosts='localhost:2182', 
>>>> handler=SequentialGeventHandler())
>>>>      self._zk.start()
>>>>      self._zk.add_listener(self._state_listener)
>>>>      self._state_listener(self._zk.state)
>>>> 
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self._zk.get_children('/abc')
>>>> 
>>>>  def register_node(self, node, data=None):
>>>>      if node in self.registered_nodes:
>>>>          if data == self.registered_nodes[node]:
>>>>              self.log.debug('Already registered [%s] in data provider.' % 
>>>> node)
>>>>              return
>>>>      self.log.debug('Registering [%s] in data provider.' % node)
>>>>      self._zk.create(node, ephemeral=True, makepath=True)
>>>>      self.registered_nodes[node] = data
>>>> 
>>>> 
>>>>  def _state_listener(self,state):
>>>>      self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>      if state == KazooState.SUSPENDED:
>>>>          self.CONNECTION_STATE=False
>>>>      elif state == KazooState.LOST:
>>>>          self.CONNECTION_STATE=False
>>>>      else:
>>>>          self.CONNECTION_STATE=True
>>>>          nodes = {}
>>>>          print self.registered_nodes
>>>>          try:
>>>>              nodes = self.registered_nodes
>>>>          except:
>>>>              pass
>>>>          self.registered_nodes = {}
>>>>          for node in nodes.iteritems():
>>>>              self.register_node(node[0], data=node[1])
>>> 
>>> python
>>>>>> import test
>>>>>> k = test.Test()
>>>>>> k.test()
>>> 
>>> (watch it hang ... )
>>> 
>>> 
>>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:
>>> 
>>>> I've got a weird connection issue playing around with Kazoo... If I do 
>>>> something simple like:
>>>> 
>>>>> k = KazooClient()
>>>>> k.start()
>>>>> k.create('/foo')
>>>>> k.stop()
>>>>> k.start()
>>>>> k.create('/foo')
>>>> 
>>>> it works fine... the node is re-created, all is happy.
>>>> 
>>>> However, if i try to use a state_listener callback to automatically 
>>>> re-register any paths that we had registered on our first connection, it 
>>>> fails. In fact, it doesn't really fail .. it hangs. This only happens if 
>>>> we try to do re-register the paths from within the state listener. If we 
>>>> do it outside of that callback (manually) it works fine. Silly code 
>>>> snippet that will cause the problem:
>>>> 
>>>>> from kazoo.client import KazooClient
>>>>> from kazoo.client import KazooState
>>>>> import logging
>>>>> log = logging.getLogger()
>>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>> log.setLevel(logging.DEBUG)
>>>>> formatter = logging.Formatter(format)
>>>>> handler = logging.StreamHandler()
>>>>> handler.setFormatter(formatter)
>>>>> log.addHandler(handler)
>>>>> 
>>>>> registered_nodes = {}
>>>>> 
>>>>> def register_node(node, data=None):
>>>>> if node in registered_nodes:
>>>>>     if data == registered_nodes[node]:
>>>>>         log.debug('Already registered [%s] in data provider.' % node)
>>>>>         return
>>>>> log.debug('Registering [%s] in data provider.' % node)
>>>>> _zk.create(node, ephemeral=True, makepath=True)
>>>>> registered_nodes[node] = data
>>>>> 
>>>>> def _re_register_nodes(nodes):
>>>>> for node in nodes.iteritems():
>>>>>     register_node(node[0], data=node[1])
>>>>> 
>>>>> def _state_listener(state):
>>>>> log.warning('Zookeeper connection state changed: %s' % state)
>>>>> if state == KazooState.SUSPENDED:
>>>>>     CONNECTION_STATE=False
>>>>> elif state == KazooState.LOST:
>>>>>     CONNECTION_STATE=False
>>>>> else:
>>>>>     CONNECTION_STATE=True
>>>>>     try:
>>>>>         nodes = registered_nodes
>>>>>     except:
>>>>>         # no local nodes
>>>>>     registered_nodes = {}
>>>>>     _re_register_nodes(nodes)
>>>>> 
>>>>> log.setLevel(logging.DEBUG) 
>>>>> 
>>>>> registered_nodes = {}
>>>>> 
>>>>> _zk = KazooClient(hosts='localhost:2182')
>>>>> _zk.start()
>>>>> _zk.add_listener(_state_listener)
>>>>> _state_listener(_zk.state)
>>>>> 
>>>>> # now register a node
>>>>> register_node('/abc/a')
>>>>> _zk.stop()
>>>>> _zk.start()
>>>> 
>>>> 
>>>> If you run this in a python shell, after the _zk.start(), your path will 
>>>> not reregister... instead, the _state_listener() method will basically 
>>>> hang when it calls _re_register_nodes(). The _re_register_nodes() method 
>>>> hangs on the first attempt to call register_node(), which will hang at the 
>>>> _zk.create. Obviously the above code is a bastardized stripped down 
>>>> version of what we're working with, but it replicates the problem. This 
>>>> could just be a problem with my understanding of how the add_listener 
>>>> callback works.. but I'm a bit confused here.
>>>> 
>>>> Long term, the goal is to have our object able to handle a disconnect and 
>>>> gracefully re-generate any paths that had been disconnected during the 
>>>> connection loss. Ironically Kazoo handles this already with its 'watcher' 
>>>> recipe. It just doesn't have the same kind of thing for any paths we 
>>>> create with KazooClient.create().
>>>> 
>>>> --Matt
>>> 
>> 
> 

Reply via email to