Author: mahadev Date: Mon Aug 3 21:32:08 2009 New Revision: 800576 URL: http://svn.apache.org/viewvc?rev=800576&view=rev Log: ZOOKEEPER-481. Add lastMessageSent to QuorumCnxManager. (flavio via mahadev)
Added: hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=800576&r1=800575&r2=800576&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Mon Aug 3 21:32:08 2009 @@ -24,6 +24,8 @@ ZOOKEEPER-457. Make ZookeeperMain public, support for HBase (and other) embedded clients (ryan rawson via phunt) + ZOOKEEPER-481. Add lastMessageSent to QuorumCnxManager. (flavio via mahadev) + IMPROVEMENTS: NEW FEATURES: Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=800576&r1=800575&r2=800576&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Mon Aug 3 21:32:08 2009 @@ -78,11 +78,12 @@ */ ConcurrentHashMap<Long, SendWorker> senderWorkerMap; ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; + ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; /* * Reception queue */ - ArrayBlockingQueue<Message> recvQueue; + public ArrayBlockingQueue<Message> recvQueue; /* * Shutdown flag @@ -93,7 +94,7 @@ /* * Listener thread */ - Listener listener; + public Listener listener; static class Message { Message(ByteBuffer buffer, long sid) { @@ -109,6 +110,8 @@ this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY); this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>(); this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>(); + this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>(); + this.self = self; // Starts listener thread that waits for connection requests @@ -116,11 +119,25 @@ } /** + * Invokes initiateConnection for testing purposes + * + * @param sid + */ + public void testInitiateConnection(long sid) throws Exception { + SocketChannel channel; + LOG.debug("Opening channel to server " + sid); + channel = SocketChannel + .open(self.quorumPeers.get(sid).electionAddr); + channel.socket().setTcpNoDelay(true); + initiateConnection(channel, sid); + } + + /** * If this server has initiated the connection, then it gives up on the * connection if it loses challenge. Otherwise, it keeps the connection. */ - boolean initiateConnection(SocketChannel s, Long sid) { + public boolean initiateConnection(SocketChannel s, Long sid) { try { // Sending id and challenge byte[] msgBytes = new byte[8]; @@ -144,31 +161,26 @@ + "reopen connection: ", e); } // Otherwise proceed with the connection - } else { - SendWorker sw = new SendWorker(s, sid); - RecvWorker rw = new RecvWorker(s, sid); - sw.setRecv(rw); - - if (senderWorkerMap - .containsKey(sid)) { - SendWorker vsw = senderWorkerMap.get(sid); - if(vsw != null) - vsw.finish(); - else LOG.error("No SendWorker for this identifier (" + sid + ")"); - } else { - LOG.error("Cannot open channel to server " + sid); - } - - if (!queueSendMap.containsKey(sid)) { - queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( - CAPACITY)); - } - - senderWorkerMap.put(sid, sw); - sw.start(); - rw.start(); + } else { + SendWorker sw = new SendWorker(s, sid); + RecvWorker rw = new RecvWorker(s, sid); + sw.setRecv(rw); + + SendWorker vsw = senderWorkerMap.get(sid); + senderWorkerMap.put(sid, sw); + + if(vsw != null) + vsw.finish(); - return true; + if (!queueSendMap.containsKey(sid)) { + queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( + CAPACITY)); + } + + sw.start(); + rw.start(); + + return true; } return false; @@ -225,27 +237,25 @@ } //Otherwise start worker threads to receive data. } else { - SendWorker sw = new SendWorker(s, sid); - RecvWorker rw = new RecvWorker(s, sid); - sw.setRecv(rw); - - if (senderWorkerMap.containsKey(sid)) { - SendWorker vsw = senderWorkerMap.get(sid); - if(vsw != null) - vsw.finish(); - else LOG.error("No SendWorker for this identifier (" + sid + ")"); - } - - senderWorkerMap.put(sid, sw); - - if (!queueSendMap.containsKey(sid)) { - queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( - CAPACITY)); - } - sw.start(); - rw.start(); + SendWorker sw = new SendWorker(s, sid); + RecvWorker rw = new RecvWorker(s, sid); + sw.setRecv(rw); - return true; + SendWorker vsw = senderWorkerMap.get(sid); + senderWorkerMap.put(sid, sw); + + if(vsw != null) + vsw.finish(); + + if (!queueSendMap.containsKey(sid)) { + queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( + CAPACITY)); + } + + sw.start(); + rw.start(); + + return true; } return false; } @@ -254,7 +264,7 @@ * Processes invoke this message to queue a message to send. Currently, * only leader election uses it. */ - void toSend(Long sid, ByteBuffer b) { + public void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). */ @@ -296,7 +306,7 @@ } catch (InterruptedException e) { LOG.warn("Interrupted while waiting to put message in queue.", e); - } + } } /** @@ -306,9 +316,10 @@ */ synchronized void connectOne(long sid){ - if ((senderWorkerMap.get(sid) == null)) { - SocketChannel channel; + + if ((senderWorkerMap.get(sid) == null)){ try { + SocketChannel channel; LOG.debug("Opening channel to server " + sid); channel = SocketChannel .open(self.quorumPeers.get(sid).electionAddr); @@ -362,6 +373,9 @@ softHalt(); } + /** + * A soft halt simply finishes workers. + */ public void softHalt(){ for(SendWorker sw: senderWorkerMap.values()){ LOG.debug("Halting sender: " + sw); @@ -372,7 +386,7 @@ /** * Thread to listen on some port */ - class Listener extends Thread { + public class Listener extends Thread { volatile ServerSocketChannel ss = null; /** @@ -380,7 +394,6 @@ */ @Override public void run() { - //ss = null; try { ss = ServerSocketChannel.open(); int port = self.quorumPeers.get(self.getId()).electionAddr.getPort(); @@ -393,10 +406,10 @@ Socket sock = client.socket(); sock.setTcpNoDelay(true); - LOG.info("Connection request " + LOG.debug("Connection request " + sock.getRemoteSocketAddress()); - //synchronized(senderWorkerMap){ - LOG.info("Connection request: " + self.getId()); + + LOG.debug("Connection request: " + self.getId()); receiveConnection(client); } } catch (IOException e) { @@ -404,10 +417,13 @@ } } + /** + * Halts this listener thread. + */ void halt(){ try{ LOG.debug("Trying to close listener: " + ss); - if(ss != null)/* && (ss.isOpen()))*/{ + if(ss != null) { LOG.debug("Closing listener: " + self.getId()); ss.close(); } @@ -423,12 +439,18 @@ * one. */ class SendWorker extends Thread { - // Send msgs to peer Long sid; SocketChannel channel; RecvWorker recvWorker; volatile boolean running = true; + /** + * An instance of this thread receives messages to send + * through a queue and sends them to the server sid. + * + * @param channel SocketChannel + * @param sid Server identifier + */ SendWorker(SocketChannel channel, Long sid) { this.sid = sid; this.channel = channel; @@ -437,25 +459,63 @@ LOG.debug("Address of remote peer: " + this.sid); } - void setRecv(RecvWorker recvWorker) { + synchronized void setRecv(RecvWorker recvWorker) { this.recvWorker = recvWorker; } + /** + * Returns RecvWorker that pairs up with this SendWorker. + * + * @return RecvWorker + */ + synchronized RecvWorker getRecvWorker(){ + return recvWorker; + } + synchronized boolean finish() { running = false; LOG.debug("Calling finish"); this.interrupt(); + try{ + channel.close(); + } catch (IOException e) { + LOG.warn("Exception while closing socket"); + } + //channel = null; + + this.interrupt(); if (recvWorker != null) recvWorker.finish(); senderWorkerMap.remove(sid); return running; } + + synchronized void send(ByteBuffer b) throws IOException { + byte[] msgBytes = new byte[b.capacity() + + (Integer.SIZE / 8)]; + ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); + msgBuffer.putInt(b.capacity()); + + msgBuffer.put(b.array(), 0, b.capacity()); + msgBuffer.position(0); + if(channel != null) + channel.write(msgBuffer); + else + throw new IOException("SocketChannel is null"); + } @Override public void run() { - - while (running && !shutdown) { + try{ + ByteBuffer b = lastMessageSent.get(sid); + if(b != null) send(b); + } catch (IOException e) { + LOG.error("Failed to send last message. Shutting down thread."); + this.finish(); + } + + while (running && !shutdown && channel != null) { ByteBuffer b = null; try { @@ -471,37 +531,15 @@ e); continue; } - + + if(b != null) + lastMessageSent.put(sid, b); + try { - byte[] msgBytes = new byte[b.capacity() - + (Integer.SIZE / 8)]; - ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); - msgBuffer.putInt(b.capacity()); - - msgBuffer.put(b.array(), 0, b.capacity()); - msgBuffer.position(0); - channel.write(msgBuffer); - - } catch (IOException e) { - /* - * If reconnection doesn't work, then put the - * message back to the beginning of the queue and leave. - */ + if(b != null) send(b); + } catch (Exception e) { LOG.warn("Exception when using channel: " + sid, e); - finish(); - recvWorker.finish(); - recvWorker = null; - - ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); - if(bq != null){ - if (bq.size() == 0) { - boolean ret = bq.offer(b); - if (!ret) { - // to appease findbugs - LOG.error("Not able to add to a quue of size 0"); - } - } - } else LOG.error("No queue for server " + sid); + this.finish(); } } LOG.warn("Send worker leaving thread"); @@ -521,7 +559,12 @@ this.sid = sid; this.channel = channel; } - + + /** + * Shuts down this worker + * + * @return boolean Value of variable running + */ synchronized boolean finish() { running = false; this.interrupt(); @@ -533,7 +576,7 @@ try { byte[] size = new byte[4]; ByteBuffer msgLength = ByteBuffer.wrap(size); - while (running && !shutdown && channel.isConnected()) { + while (running && !shutdown && channel != null) { /** * Reads the first int to determine the length of the * message Added: hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=800576&view=auto ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (added) +++ hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Mon Aug 3 21:32:08 2009 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.Semaphore; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumCnxManager; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.Test; + + +/** + * This test uses two mock servers, each running an instance of QuorumCnxManager. + * It simulates the situation in which a peer P sends a message to another peer Q + * while Q is trying to open a connection to P. In this test, Q iniates a connection + * to P as soon as it receives a message from P, and verifies that it receives a + * copy of the message. + * + * This simple tests verifies that the new mechanism that duplicates the last message + * sent upon a re-connection works. + * + */ +public class CnxManagerTest extends TestCase { + protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class); + + int baseport; + int baseLEport; + int count; + HashMap<Long,QuorumServer> peers; + File tmpdir[]; + int port[]; + + public void setUp() throws Exception { + this.baseport= 33003; + this.baseLEport = 43003; + this.count = 3; + this.peers = new HashMap<Long,QuorumServer>(count); + tmpdir = new File[count]; + port = new int[count]; + + for(int i = 0; i < count; i++) { + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress(baseport + i), + new InetSocketAddress(baseLEport + i))); + tmpdir[i] = ClientBase.createTmpDir(); + port[i] = baseport + i; + } + } + + public void tearDown() { + + } + + + ByteBuffer createMsg(int state, long leader, long zxid, long epoch){ + byte requestBytes[] = new byte[28]; + ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); + + /* + * Building notification packet to send + */ + + requestBuffer.clear(); + requestBuffer.putInt(state); + requestBuffer.putLong(leader); + requestBuffer.putLong(zxid); + requestBuffer.putLong(epoch); + + return requestBuffer; + } + + class CnxManagerThread extends Thread { + + CnxManagerThread(){} + + public void run(){ + try { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + long sid = 1; + cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1)); + cnxManager.recvQueue.take(); + cnxManager.testInitiateConnection(sid); + + cnxManager.recvQueue.take(); + } catch (Exception e) { + LOG.error("Exception while running mock thread", e); + fail("Unexpected exception"); + } + } + } + + @Test + public void testCnxManager() throws Exception { + Thread thread = new CnxManagerThread(); + + thread.start(); + + QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); + cnxManager.recvQueue.take(); + + thread.join(5000); + if (thread.isAlive()) { + fail("Threads didn't join"); + } + } + + + +} \ No newline at end of file