Author: mahadev Date: Fri Apr 3 22:01:18 2009 New Revision: 761816 URL: http://svn.apache.org/viewvc?rev=761816&view=rev Log: ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election) (flavio via mahadev)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=761816&r1=761815&r2=761816&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Apr 3 22:01:18 2009 @@ -35,7 +35,9 @@ tickTime from config is lost, cannot start quorum (phunt via mahadev) ZOOKEEPER-360. WeakHashMap in Bookie.java causes NPE (flavio via mahadev) - + + ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election) +(flavio via mahadev) IMPROVEMENTS: ZOOKEEPER-308. improve the atomic broadcast performance 3x. Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=761816&r1=761815&r2=761816&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Apr 3 22:01:18 2009 @@ -562,6 +562,7 @@ n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid); if (n.epoch > logicalclock) { + LOG.debug("Increasing logical clock: " + n.epoch); logicalclock = n.epoch; recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid())) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=761816&r1=761815&r2=761816&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Fri Apr 3 22:01:18 2009 @@ -238,13 +238,8 @@ SendWorker sw = senderWorkerMap.get(sid); LOG.info("Create new connection to server: " + sid); - //sw.connect(); s.socket().close(); - if(sw != null) sw.finish(); - SocketChannel channel = SocketChannel.open(self.quorumPeers.get(sid).electionAddr); - if (channel.isConnected()) { - initiateConnection(channel, sid); - } + connectOne(sid); } catch (IOException e) { LOG.info("Error when closing socket or trying to reopen connection: " @@ -329,7 +324,7 @@ * @param sid server id */ - void connectOne(long sid){ + synchronized void connectOne(long sid){ if ((senderWorkerMap.get(sid) == null)) { SocketChannel channel; try { @@ -395,13 +390,13 @@ */ class Listener extends Thread { - ServerSocketChannel ss = null; + volatile ServerSocketChannel ss = null; /** * Sleeps on accept(). */ @Override public void run() { - ServerSocketChannel ss = null; + //ss = null; try { ss = ServerSocketChannel.open(); int port = self.quorumPeers.get(self.getId()).electionAddr.getPort(); @@ -421,13 +416,17 @@ receiveConnection(client); } } catch (IOException e) { - System.err.println("Listener.run: " + e.getMessage()); + LOG.error("Listener.run: " + e.getMessage()); } } void halt(){ try{ - if((ss != null) && (ss.isOpen())) ss.close(); + LOG.debug("Trying to close listener: " + ss); + if(ss != null)/* && (ss.isOpen()))*/{ + LOG.debug("Closing listener: " + self.getId()); + ss.close(); + } } catch (IOException e){ LOG.warn("Exception when shutting down listener: " + e); } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=761816&r1=761815&r2=761816&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Fri Apr 3 22:01:18 2009 @@ -47,7 +47,7 @@ volatile int [] round; Semaphore start0; - Semaphore finish3; + Semaphore finish3, finish0; @Override public void setUp() throws Exception { @@ -66,6 +66,7 @@ round[2] = 0; start0 = new Semaphore(0); + finish0 = new Semaphore(0); finish3 = new Semaphore(0); LOG.info("SetUp " + getName()); @@ -117,11 +118,18 @@ switch(i){ case 0: LOG.info("First peer, do nothing, just join"); - flag = false; + if(finish0.tryAcquire(1000, java.util.concurrent.TimeUnit.MILLISECONDS)){ + //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ + LOG.info("Setting flag to false"); + flag = false; + } break; case 1: LOG.info("Second entering case"); - if(round[1] != 0) flag = false; + if(round[1] != 0){ + finish0.release(); + flag = false; + } else{ finish3.acquire(); start0.release(); @@ -167,7 +175,8 @@ thread.start(); threads.add(thread); } - start0.acquire(); + if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS)) + fail("First leader election failed"); QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2); peer.startLeaderElection();