Author: mahadev
Date: Fri Apr  3 22:01:18 2009
New Revision: 761816

URL: http://svn.apache.org/viewvc?rev=761816&view=rev
Log:
ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election) 
(flavio via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Apr  3 22:01:18 2009
@@ -35,7 +35,9 @@
   tickTime from config is lost, cannot start quorum (phunt via mahadev)
 
   ZOOKEEPER-360. WeakHashMap in Bookie.java causes NPE (flavio via mahadev)
-
+  
+  ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election)
+(flavio via mahadev)
 
 IMPROVEMENTS:
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
 Fri Apr  3 22:01:18 2009
@@ -562,6 +562,7 @@
                             n.epoch + ", " + self.getId() + ", " + 
self.getPeerState() + 
                             ", " + n.state + ", " + n.sid);
                     if (n.epoch > logicalclock) {
+                        LOG.debug("Increasing logical clock: " + n.epoch);
                         logicalclock = n.epoch;
                         recvset.clear();
                         if(totalOrderPredicate(n.leader, n.zxid, self.getId(), 
self.getLastLoggedZxid()))

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Fri Apr  3 22:01:18 2009
@@ -238,13 +238,8 @@
                 SendWorker sw = senderWorkerMap.get(sid);
 
                 LOG.info("Create new connection to server: " + sid);
-                //sw.connect();
                 s.socket().close();
-                if(sw != null) sw.finish();
-                SocketChannel channel = 
SocketChannel.open(self.quorumPeers.get(sid).electionAddr);
-                if (channel.isConnected()) {
-                    initiateConnection(channel, sid);
-                }
+                connectOne(sid);
                 
             } catch (IOException e) {
                 LOG.info("Error when closing socket or trying to reopen 
connection: "
@@ -329,7 +324,7 @@
      *  @param sid  server id
      */
     
-    void connectOne(long sid){
+    synchronized void connectOne(long sid){
         if ((senderWorkerMap.get(sid) == null)) {
             SocketChannel channel;
             try {
@@ -395,13 +390,13 @@
      */
     class Listener extends Thread {
 
-        ServerSocketChannel ss = null;
+        volatile ServerSocketChannel ss = null;
         /**
          * Sleeps on accept().
          */
         @Override
         public void run() {
-            ServerSocketChannel ss = null;
+            //ss = null;
             try {
                 ss = ServerSocketChannel.open();
                 int port = 
self.quorumPeers.get(self.getId()).electionAddr.getPort();
@@ -421,13 +416,17 @@
                     receiveConnection(client);
                 }
             } catch (IOException e) {
-                System.err.println("Listener.run: " + e.getMessage());
+                LOG.error("Listener.run: " + e.getMessage());
             }
         }
         
         void halt(){
             try{
-                if((ss != null) && (ss.isOpen())) ss.close();
+                LOG.debug("Trying to close listener: " + ss);
+                if(ss != null)/* && (ss.isOpen()))*/{
+                    LOG.debug("Closing listener: " + self.getId());
+                    ss.close();
+                }
             } catch (IOException e){
                 LOG.warn("Exception when shutting down listener: " + e);
             }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
 Fri Apr  3 22:01:18 2009
@@ -47,7 +47,7 @@
     volatile int [] round;
     
     Semaphore start0;
-    Semaphore finish3;
+    Semaphore finish3, finish0;
     
     @Override
     public void setUp() throws Exception {
@@ -66,6 +66,7 @@
         round[2] = 0;
         
         start0 = new Semaphore(0);
+        finish0 = new Semaphore(0);
         finish3 = new Semaphore(0);
         
         LOG.info("SetUp " + getName());
@@ -117,11 +118,18 @@
                        switch(i){
                        case 0:
                                LOG.info("First peer, do nothing, just join");
-                               flag = false;
+                               if(finish0.tryAcquire(1000, 
java.util.concurrent.TimeUnit.MILLISECONDS)){
+                               //if(threads.get(0).peer.getPeerState() == 
ServerState.LEADING ){
+                                   LOG.info("Setting flag to false");
+                                   flag = false;
+                               }
                                break;
                        case 1:
                                LOG.info("Second entering case");
-                               if(round[1] != 0) flag = false;
+                               if(round[1] != 0){
+                                   finish0.release();
+                                   flag = false;
+                               }
                                else{
                                        finish3.acquire();
                                        start0.release();
@@ -167,7 +175,8 @@
               thread.start();
               threads.add(thread);
           }
-          start0.acquire();
+          if(!start0.tryAcquire(4000, 
java.util.concurrent.TimeUnit.MILLISECONDS))
+              fail("First leader election failed");
 
           QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], 
port[0], 3, 0, 2, 2, 2);
           peer.startLeaderElection();


Reply via email to