I had posted this message as a comment for ZOOKEEPER-822. I thought it might
be a good idea to give a wider attention so that it will be easier to
I found few problems in the FLE implementation while debugging for:
https://issues.apache.org/jira/browse/ZOOKEEPER-822. Following the email
below might require some background. If necessary, please browse the JIRA. I
have a patch for 1. a) and 2). I will send them out soon.
1. Blocking connects and accepts:
a) The first problem is in manager.toSend(). This invokes connectOne(),
which does a blocking connect. While testing, I changed the code so that
connectOne() starts a new thread called AsyncConnct(). AsyncConnect.run()
does a socketChannel.connect(). After starting AsyncConnect, connectOne
starts a timer. connectOne continues with normal operations if the
connection is established before the timer expires, otherwise, when the
timer expires it interrupts AsyncConnect() thread and returns. In this way,
I can have an upper bound on the amount of time we need to wait for connect
to succeed. Of course, this was a quick fix for my testing. Ideally, we
should use Selector to do non-blocking connects/accepts. I am planning to do
that later once we at least have a quick fix for the problem and consensus
from others for the real fix (this problem is big blocker for us). Note that
it is OK to do blocking IO in SenderWorker and RecvWorker threads since they
block IO to the respective peer.
b) The blocking IO problem is not just restricted to connectOne(), but also
in receiveConnection(). The Listener thread calls receiveConnection() for
each incoming connection request. receiveConnection does blocking IO to get
peer's info (s.read(msgBuffer)). Worse, it invokes connectOne() back to the
peer that had sent the connection request. All of this is happening from the
Listener. In short, if a peer fails after initiating a connection, the
Listener thread won't be able to accept connections from other peers,
because it would be stuck in read() or connetOne(). Also the code has an
inherent cycle. initiateConnection() and receiveConnection() will have to be
very carefully synchronized otherwise, we could run into deadlocks. This
code is going to be difficult to maintain/modify.
2. Buggy senderWorkerMap handling:
The code that manages senderWorkerMap is very buggy. It is causing multiple
election rounds. While debugging I found that sometimes after FLE a node
will have its sendWorkerMap empty even if it has SenderWorker and RecvWorker
threads for each peer.
a) The receiveConnection() method calls the finish() method, which removes
an entry from the map. Additionally, the thread itself calls finish() which
could remove the newly added entry from the map. In short, receiveConnection
is causing the exact condition that you mentioned above.
b) Apart from the bug in finish(), receiveConnection is making an entry in
senderWorkerMap at the wrong place. Here's the buggy code:
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
It makes an entry for the new thread and then calls finish, which causes the
new thread to be removed from the Map. The old thread will also get
terminated since finish() will interrupt the thread.
3. Race condition in receiveConnection and initiateConnection:
*In theory*, two peers can keep disconnecting each other's connection.
T0: Peer 0 initiates a connection (request 1)
T1: Peer 1 receives connection from
T2: Peer 1 calls receiveConnection()
T2: Peer 0 closes connection to Peer 1 because its ID is lower.
T3: Peer 0 re-initiates connection to Peer 1 from manger.toSend() (request
T3: Peer 1 terminates older connection
to peer 0
T4: Peer 1 calls connectOne() which
starts new sendWorker threads for peer 0
T5: Peer 1 kills connection created in
T3 because it receives another (request 2) connect request from 0
The problem here is that while Peer 0 is accepting a connection from Peer 1
it can also be initiating a connection to Peer 1. So if they hit the right
frequencies they could sit in a connect/disconnect loop and cause multiple
rounds of leader election.
I think the cause here is again blocking connects()/accepts(). A peer starts
to take action (to kill existing threads and start new threads) as soon as a
connection is established at the* *TCP level. That is, it does not give us
any control to synchronized connect and accepts. We could use non-blocking
connects and accepts. This will allow us to a) tell a thread to not initiate
a connection because the listener is about to accept a connection from the
remote peer (use isAcceptable() and isConnectable()methods of SelectionKey)
and b) prevent a thread from initiating multiple connect request to the same
peer. It will simplify synchronization.