On Dec 21, 2012, at 10:00 PM, Matt Wise wrote:

> I just want to circle around on this issue.. The root cause was that the 
> Kazoo thread triggering the 'state listener' callback does so with a lock 
> that it does not release until after the callbacks are finished. However, if 
> your callback tries to use this thread to make a Zookeeper call (ie, get, 
> set, create, delete), it waits on that lock and immediately causes a deadlock.

Ahh, calling user callbacks while holding a lock.  That would do it.  :)

> To handle this scenario they have a 'spawn()' function in the Zookeeper 
> thread handler that you can use:
> 
>>            self._zk.handler.spawn(self._re_establish_registrations)
> 
> This spawns a thread and immediately returns, allowing the Zookeeper thread 
> to finish its callbacks and release the lock. Thanks a ton to Ben B. for 
> helping me track that down.

Yeah, Ben is awesome.


Regards,
Alan

> On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[email protected]> wrote:
> 
>> Without really digging into this I'll toss in my initial observation.
>> 
>> Calling zk while still being inside a zk callback seems a bit dangerous.  I 
>> would have a queue and event thread and have work from the callbacks feed 
>> this queue which would be executed inside the event thread.
>> 
>> 
>> Regards,
>> Alan
>> 
>> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
>> 
>>> Just to clarify, if you go and change test() into:
>>> 
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self.register_node('/abc/a')
>>>>      self._zk.get_children('/abc')
>>>> 
>>> 
>>> and then remove these lines from the state_handler() method:
>>>> 
>>>>>         for node in nodes.iteritems():
>>>>>             self.register_node(node[0], data=node[1])
>>>> 
>>> 
>>> then it works perfectly.. no hang, nothing. it seems that the register_node 
>>> cannot be called from within the state handler class. Why?
>>> 
>>> On Dec 9, 2012, at 2:26 PM, Matt Wise <[email protected]> wrote:
>>> 
>>>> Hrmm here's a cleaner way to reproduce the issue:
>>>> 
>>>> test.py:
>>>>> from kazoo.client import KazooClient
>>>>> from kazoo.client import KazooState
>>>>> from kazoo.handlers.threading import TimeoutError
>>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>>> import logging
>>>>> 
>>>>> 
>>>>> class Test(object):
>>>>> def __init__(self):
>>>>>     self.log = logging.getLogger()
>>>>>     format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: 
>>>>> (%(levelname)s) %(message)s'
>>>>>     self.log.setLevel(logging.INFO)
>>>>>     formatter = logging.Formatter(format)
>>>>>     handler = logging.StreamHandler()
>>>>>     handler.setFormatter(formatter)
>>>>>     self.log.addHandler(handler)
>>>>> 
>>>>>     self.registered_nodes = {}
>>>>> 
>>>>>     self.log.setLevel(logging.DEBUG)
>>>>> 
>>>>>     self._zk = KazooClient(hosts='localhost:2182', 
>>>>> handler=SequentialGeventHandler())
>>>>>     self._zk.start()
>>>>>     self._zk.add_listener(self._state_listener)
>>>>>     self._state_listener(self._zk.state)
>>>>> 
>>>>> def test(self):
>>>>>     # now register a node
>>>>>     self.register_node('/abc/a')
>>>>>     self._zk.stop()
>>>>>     self._zk.start()
>>>>>     self._zk.get_children('/abc')
>>>>> 
>>>>> def register_node(self, node, data=None):
>>>>>     if node in self.registered_nodes:
>>>>>         if data == self.registered_nodes[node]:
>>>>>             self.log.debug('Already registered [%s] in data provider.' % 
>>>>> node)
>>>>>             return
>>>>>     self.log.debug('Registering [%s] in data provider.' % node)
>>>>>     self._zk.create(node, ephemeral=True, makepath=True)
>>>>>     self.registered_nodes[node] = data
>>>>> 
>>>>> 
>>>>> def _state_listener(self,state):
>>>>>     self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>>     if state == KazooState.SUSPENDED:
>>>>>         self.CONNECTION_STATE=False
>>>>>     elif state == KazooState.LOST:
>>>>>         self.CONNECTION_STATE=False
>>>>>     else:
>>>>>         self.CONNECTION_STATE=True
>>>>>         nodes = {}
>>>>>         print self.registered_nodes
>>>>>         try:
>>>>>             nodes = self.registered_nodes
>>>>>         except:
>>>>>             pass
>>>>>         self.registered_nodes = {}
>>>>>         for node in nodes.iteritems():
>>>>>             self.register_node(node[0], data=node[1])
>>>> 
>>>> python
>>>>>>> import test
>>>>>>> k = test.Test()
>>>>>>> k.test()
>>>> 
>>>> (watch it hang ... )
>>>> 
>>>> 
>>>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[email protected]> wrote:
>>>> 
>>>>> I've got a weird connection issue playing around with Kazoo... If I do 
>>>>> something simple like:
>>>>> 
>>>>>> k = KazooClient()
>>>>>> k.start()
>>>>>> k.create('/foo')
>>>>>> k.stop()
>>>>>> k.start()
>>>>>> k.create('/foo')
>>>>> 
>>>>> it works fine... the node is re-created, all is happy.
>>>>> 
>>>>> However, if i try to use a state_listener callback to automatically 
>>>>> re-register any paths that we had registered on our first connection, it 
>>>>> fails. In fact, it doesn't really fail .. it hangs. This only happens if 
>>>>> we try to do re-register the paths from within the state listener. If we 
>>>>> do it outside of that callback (manually) it works fine. Silly code 
>>>>> snippet that will cause the problem:
>>>>> 
>>>>>> from kazoo.client import KazooClient
>>>>>> from kazoo.client import KazooState
>>>>>> import logging
>>>>>> log = logging.getLogger()
>>>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>>> log.setLevel(logging.DEBUG)
>>>>>> formatter = logging.Formatter(format)
>>>>>> handler = logging.StreamHandler()
>>>>>> handler.setFormatter(formatter)
>>>>>> log.addHandler(handler)
>>>>>> 
>>>>>> registered_nodes = {}
>>>>>> 
>>>>>> def register_node(node, data=None):
>>>>>> if node in registered_nodes:
>>>>>>    if data == registered_nodes[node]:
>>>>>>        log.debug('Already registered [%s] in data provider.' % node)
>>>>>>        return
>>>>>> log.debug('Registering [%s] in data provider.' % node)
>>>>>> _zk.create(node, ephemeral=True, makepath=True)
>>>>>> registered_nodes[node] = data
>>>>>> 
>>>>>> def _re_register_nodes(nodes):
>>>>>> for node in nodes.iteritems():
>>>>>>    register_node(node[0], data=node[1])
>>>>>> 
>>>>>> def _state_listener(state):
>>>>>> log.warning('Zookeeper connection state changed: %s' % state)
>>>>>> if state == KazooState.SUSPENDED:
>>>>>>    CONNECTION_STATE=False
>>>>>> elif state == KazooState.LOST:
>>>>>>    CONNECTION_STATE=False
>>>>>> else:
>>>>>>    CONNECTION_STATE=True
>>>>>>    try:
>>>>>>        nodes = registered_nodes
>>>>>>    except:
>>>>>>        # no local nodes
>>>>>>    registered_nodes = {}
>>>>>>    _re_register_nodes(nodes)
>>>>>> 
>>>>>> log.setLevel(logging.DEBUG) 
>>>>>> 
>>>>>> registered_nodes = {}
>>>>>> 
>>>>>> _zk = KazooClient(hosts='localhost:2182')
>>>>>> _zk.start()
>>>>>> _zk.add_listener(_state_listener)
>>>>>> _state_listener(_zk.state)
>>>>>> 
>>>>>> # now register a node
>>>>>> register_node('/abc/a')
>>>>>> _zk.stop()
>>>>>> _zk.start()
>>>>> 
>>>>> 
>>>>> If you run this in a python shell, after the _zk.start(), your path will 
>>>>> not reregister... instead, the _state_listener() method will basically 
>>>>> hang when it calls _re_register_nodes(). The _re_register_nodes() method 
>>>>> hangs on the first attempt to call register_node(), which will hang at 
>>>>> the _zk.create. Obviously the above code is a bastardized stripped down 
>>>>> version of what we're working with, but it replicates the problem. This 
>>>>> could just be a problem with my understanding of how the add_listener 
>>>>> callback works.. but I'm a bit confused here.
>>>>> 
>>>>> Long term, the goal is to have our object able to handle a disconnect and 
>>>>> gracefully re-generate any paths that had been disconnected during the 
>>>>> connection loss. Ironically Kazoo handles this already with its 'watcher' 
>>>>> recipe. It just doesn't have the same kind of thing for any paths we 
>>>>> create with KazooClient.create().
>>>>> 
>>>>> --Matt
>>>> 
>>> 
>> 
> 

Reply via email to