Author: slebresne
Date: Wed Aug 31 16:06:09 2011
New Revision: 1163677

URL: http://svn.apache.org/viewvc?rev=1163677&view=rev
Log:
Make repair report failure when a participating node dies
patch by slebresne; reviewed by jbellis for CASSANDRA-2433

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Aug 31 16:06:09 2011
@@ -46,6 +46,8 @@
  * Add validation that Keyspace names are case-insensitively unique 
(CASSANDRA-3066)
  * catch invalid key_validation_class before instantiating UpdateColumnFamily 
(CASSANDRA-3102)
  * make Range and Bounds objects client-safe (CASSANDRA-3108)
+ * make repair report failure when a node participating dies (instead of
+   hanging forever) (CASSANDRA-2433)
 
 
 0.8.4

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java
 Wed Aug 31 16:06:09 2011
@@ -152,7 +152,7 @@ public class FailureDetector implements 
         {     
             for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
             {
-                listener.convict(ep);
+                listener.convict(ep, phi);
             }
         }        
     }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
 Wed Aug 31 16:06:09 2011
@@ -244,7 +244,7 @@ public class Gossiper implements IFailur
      *
      * param @ endpoint end point that is convicted.
     */
-    public void convict(InetAddress endpoint)
+    public void convict(InetAddress endpoint, double phi)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState.isAlive())
@@ -717,12 +717,11 @@ public class Gossiper implements IFailur
         if (logger.isTraceEnabled())
             logger.trace("Adding endpoint state for " + ep);
         endpointStateMap.put(ep, epState);
-        if (epState.isAlive())
-        {
-            // the node restarted before we ever marked it down, so we'll 
report it as dead briefly so maintenance like resetting the connection pool can 
occur 
-            for (IEndpointStateChangeSubscriber subscriber : subscribers)
-                subscriber.onDead(ep, epState);
-        }
+
+        // the node restarted: it is up to the subscriber to take whatever 
action is necessary
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onRestart(ep, epState);
+
         if (epState.getApplicationState(ApplicationState.STATUS) != null && 
!isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
             markAlive(ep, epState);
         else

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
 Wed Aug 31 16:06:09 2011
@@ -47,4 +47,12 @@ public interface IEndpointStateChangeSub
     public void onDead(InetAddress endpoint, EndpointState state);
 
     public void onRemove(InetAddress endpoint);
+
+    /**
+     * Called whenever a node is restarted.
+     * Note that there is no guarantee when that happens that the node was
+     * previously marked down. It will have only if {@code state.isAlive() == 
false}
+     * as {@code state} is from before the restarted node is marked up.
+     */
+    public void onRestart(InetAddress endpoint, EndpointState state);
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
 Wed Aug 31 16:06:09 2011
@@ -31,7 +31,7 @@ public interface IFailureDetectionEventL
     /**
      * Convict the specified endpoint.
      * @param ep endpoint to be convicted
+     * @param phi the value of phi with with ep was convicted
      */
-    public void convict(InetAddress ep);
-    
+    public void convict(InetAddress ep, double phi);
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Wed Aug 31 16:06:09 2011
@@ -23,27 +23,26 @@ import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
@@ -95,19 +94,19 @@ public class AntiEntropyService
     // singleton enforcement
     public static final AntiEntropyService instance = new AntiEntropyService();
 
-    // timeout for outstanding requests (48 hours)
-    public final static long REQUEST_TIMEOUT = 48*60*60*1000;
-
-    /**
-     * Map of outstanding sessions to requests. Once both trees reach the 
rendezvous, the local node
-     * will queue a Differencer to compare them.
-     *
-     * This map is only accessed from Stage.ANTIENTROPY, so it is not 
synchronized.
-     */
-    private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
+    private static final ThreadPoolExecutor executor;
+    static
+    {
+        executor = new JMXConfigurableThreadPoolExecutor(4,
+                                                         60,
+                                                         TimeUnit.SECONDS,
+                                                         new 
LinkedBlockingQueue<Runnable>(),
+                                                         new 
NamedThreadFactory("AntiEntropySessions"),
+                                                         "internal");
+    }
 
     /**
-     * A map of repair session ids to a Queue of TreeRequests that have been 
performed since the session was started.
+     * A map of active session.
      */
     private final ConcurrentMap<String, RepairSession> sessions;
 
@@ -116,22 +115,24 @@ public class AntiEntropyService
      */
     protected AntiEntropyService()
     {
-        requests = new ExpiringMap<String, Map<TreeRequest, 
TreePair>>(REQUEST_TIMEOUT);
         sessions = new ConcurrentHashMap<String, RepairSession>();
     }
 
     /**
      * Requests repairs for the given table and column families, and blocks 
until all repairs have been completed.
-     * TODO: Should add retries: if nodes go offline before they respond to 
the requests, this could block forever.
      */
-    public RepairSession getRepairSession(Range range, String tablename, 
String... cfnames)
+    public RepairFuture submitRepairSession(Range range, String tablename, 
String... cfnames)
     {
-        return new RepairSession(range, tablename, cfnames);
+        RepairFuture futureTask = new RepairSession(range, tablename, 
cfnames).getFuture();
+        executor.execute(futureTask);
+        return futureTask;
     }
 
-    RepairSession getArtificialRepairSession(TreeRequest req, String 
tablename, String... cfnames)
+    RepairFuture submitArtificialRepairSession(TreeRequest req, String 
tablename, String... cfnames)
     {
-        return new RepairSession(req, tablename, cfnames);
+        RepairFuture futureTask = new RepairSession(req, tablename, 
cfnames).getFuture();
+        executor.execute(futureTask);
+        return futureTask;
     }
 
     /**
@@ -591,24 +592,25 @@ public class AntiEntropyService
      * Triggers repairs with all neighbors for the given table, cfs and range.
      * Typical lifecycle is: start() then join(). Executed in client threads.
      */
-    class RepairSession extends Thread
+    class RepairSession extends WrappedRunnable implements 
IEndpointStateChangeSubscriber, IFailureDetectionEventListener
     {
+        private final String sessionName;
         private final String tablename;
         private final String[] cfnames;
-        private final ConcurrentHashMap<TreeRequest,Object> requests = new 
ConcurrentHashMap<TreeRequest,Object>();
         private final Range range;
-        private final Set<InetAddress> endpoints;
+        private volatile Exception exception;
+        private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
-        private CountDownLatch completedLatch;
+        private final Set<InetAddress> endpoints;
         final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+        final Map<String, RepairJob> activeJobs = new 
ConcurrentHashMap<String, RepairJob>();
 
+        private final SimpleCondition completed = new SimpleCondition();
         public final Condition differencingDone = new SimpleCondition();
 
         public RepairSession(TreeRequest req, String tablename, String... 
cfnames)
         {
             this(req.sessionid, req.range, tablename, cfnames);
-            requests.put(req, this);
-            completedLatch = new CountDownLatch(cfnames.length);
             AntiEntropyService.instance.sessions.put(getName(), this);
         }
 
@@ -619,7 +621,7 @@ public class AntiEntropyService
 
         private RepairSession(String id, Range range, String tablename, 
String[] cfnames)
         {
-            super(id);
+            this.sessionName = id;
             this.tablename = tablename;
             this.cfnames = cfnames;
             assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
@@ -627,8 +629,18 @@ public class AntiEntropyService
             this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
         }
 
-        @Override
-        public void run()
+        public String getName()
+        {
+            return sessionName;
+        }
+
+        RepairFuture getFuture()
+        {
+            return new RepairFuture(this);
+        }
+
+        // we don't care about the return value but care about it throwing 
exception
+        public void runMayThrow() throws Exception
         {
             if (endpoints.isEmpty())
             {
@@ -649,20 +661,25 @@ public class AntiEntropyService
             }
 
             AntiEntropyService.instance.sessions.put(getName(), this);
+            Gossiper.instance.register(this);
+            
FailureDetector.instance.registerFailureDetectionEventListener(this);
             try
             {
                 // Create and queue a RepairJob for each column family
                 for (String cfname : cfnames)
-                    jobs.offer(new RepairJob(cfname));
-
-                // We'll repair once by endpoints and column family
-                completedLatch = new CountDownLatch(endpoints.size() * 
cfnames.length);
+                {
+                    RepairJob job = new RepairJob(cfname);
+                    jobs.offer(job);
+                    activeJobs.put(cfname, job);
+                }
 
                 jobs.peek().sendTreeRequests();
 
                 // block whatever thread started this session until all 
requests have been returned:
                 // if this thread dies, the session will still complete in the 
background
-                completedLatch.await();
+                completed.await();
+                if (exception != null)
+                    throw exception;
             }
             catch (InterruptedException e)
             {
@@ -670,6 +687,8 @@ public class AntiEntropyService
             }
             finally
             {
+                
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+                Gossiper.instance.unregister(this);
                 AntiEntropyService.instance.sessions.remove(getName());
             }
         }
@@ -677,20 +696,69 @@ public class AntiEntropyService
         void completed(InetAddress remote, String cfname)
         {
             logger.debug("Repair completed for {} on {}", remote, cfname);
-            completedLatch.countDown();
+            RepairJob job = activeJobs.get(cfname);
+            if (job.completedSynchronizationJob(remote))
+            {
+                activeJobs.remove(cfname);
+                if (activeJobs.isEmpty())
+                    completed.signalAll();
+            }
+        }
+
+        void failedNode(InetAddress remote)
+        {
+            String errorMsg = String.format("Problem during repair session %s, 
endpoint %s died", sessionName, remote);
+            logger.error(errorMsg);
+            exception = new IOException(errorMsg);
+            // If a node failed, we stop everything (though there could still 
be some activity in the background)
+            jobs.clear();
+            activeJobs.clear();
+            differencingDone.signalAll();
+            completed.signalAll();
+        }
+
+        public void onJoin(InetAddress endpoint, EndpointState epState) {}
+        public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value) {}
+        public void onAlive(InetAddress endpoint, EndpointState state) {}
+        public void onDead(InetAddress endpoint, EndpointState state) {}
+
+        public void onRemove(InetAddress endpoint)
+        {
+            convict(endpoint, Double.MAX_VALUE);
+        }
+
+        public void onRestart(InetAddress endpoint, EndpointState epState)
+        {
+            convict(endpoint, Double.MAX_VALUE);
+        }
+
+        public void convict(InetAddress endpoint, double phi)
+        {
+            if (!endpoints.contains(endpoint))
+                return;
+
+            // We want a higher confidence in the failure detection than usual 
because failing a repair wrongly has a high cost.
+            if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+                return;
+
+            // Though unlikely, it is possible to arrive here multiple time 
and we
+            // want to avoid print an error message twice
+            if (!isFailed.compareAndSet(false, true))
+                return;
+
+            failedNode(endpoint);
         }
 
         class RepairJob
         {
             private final String cfname;
-            private final AtomicInteger remaining;
-            private final Map<InetAddress, MerkleTree> trees;
+            private final Set<InetAddress> requestedEndpoints = new 
HashSet<InetAddress>();
+            private final Map<InetAddress, MerkleTree> trees = new 
HashMap<InetAddress, MerkleTree>();
+            private final Set<InetAddress> syncJobs = new 
HashSet<InetAddress>();
 
             public RepairJob(String cfname)
             {
                 this.cfname = cfname;
-                this.remaining = new AtomicInteger(endpoints.size() + 1); // 
all neighbor + local host
-                this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>();
             }
 
             /**
@@ -698,22 +766,24 @@ public class AntiEntropyService
              */
             public void sendTreeRequests()
             {
-                // send requests to remote nodes and record them
-                for (InetAddress endpoint : endpoints)
-                    
requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, 
tablename, cfname), RepairSession.this);
-                // send but don't record an outstanding request to the local 
node
-                AntiEntropyService.instance.request(getName(), 
FBUtilities.getLocalAddress(), range, tablename, cfname);
+                requestedEndpoints.addAll(endpoints);
+                requestedEndpoints.add(FBUtilities.getLocalAddress());
+
+                // send requests to all nodes
+                for (InetAddress endpoint : requestedEndpoints)
+                    AntiEntropyService.instance.request(getName(), endpoint, 
range, tablename, cfname);
             }
 
             /**
              * Add a new received tree and return the number of remaining tree 
to
              * be received for the job to be complete.
              */
-            public int addTree(TreeRequest request, MerkleTree tree)
+            public synchronized int addTree(TreeRequest request, MerkleTree 
tree)
             {
                 assert request.cf.right.equals(cfname);
                 trees.put(request.endpoint, tree);
-                return remaining.decrementAndGet();
+                requestedEndpoints.remove(request.endpoint);
+                return requestedEndpoints.size();
             }
 
             /**
@@ -722,7 +792,7 @@ public class AntiEntropyService
              */
             public void submitDifferencers()
             {
-                assert remaining.get() == 0;
+                assert requestedEndpoints.size() == 0;
 
                 // Right now, we only difference local host against each 
other. CASSANDRA-2610 will fix that.
                 // In the meantime ugly special casing will work good enough.
@@ -734,9 +804,17 @@ public class AntiEntropyService
                         continue;
 
                     Differencer differencer = new Differencer(cfname, 
entry.getKey(), entry.getValue(), localTree);
+                    syncJobs.add(entry.getKey());
                     logger.debug("Queueing comparison " + differencer);
                     
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
                 }
+                trees.clear(); // allows gc to do its thing
+            }
+
+            synchronized boolean completedSynchronizationJob(InetAddress 
remote)
+            {
+                syncJobs.remove(remote);
+                return syncJobs.isEmpty();
             }
         }
 
@@ -842,11 +920,21 @@ public class AntiEntropyService
                         return;
 
                     // all calls finished successfully
-                    //
                     completed(remote, cfname);
-                    logger.info(String.format("Finished streaming repair with 
%s for %s: %d oustanding to complete session", remote, range, 
completedLatch.getCount()));
+                    logger.info(String.format("Finished streaming repair with 
%s for %s", remote, range));
                 }
             }
         }
     }
+
+    public static class RepairFuture extends FutureTask
+    {
+        public final RepairSession session;
+
+        RepairFuture(RepairSession session)
+        {
+            super(session, null);
+            this.session = session;
+        }
+    }
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
 Wed Aug 31 16:06:09 2011
@@ -78,6 +78,8 @@ public class MigrationManager implements
 
     public void onDead(InetAddress endpoint, EndpointState state) { }
 
+    public void onRestart(InetAddress endpoint, EndpointState state) { }
+
     public void onRemove(InetAddress endpoint) { }
     
     /** 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 Wed Aug 31 16:06:09 2011
@@ -197,6 +197,8 @@ public class StorageLoadBalancer impleme
 
     public void onDead(InetAddress endpoint, EndpointState state) {}
 
+    public void onRestart(InetAddress endpoint, EndpointState state) { }
+
     public void onRemove(InetAddress endpoint) {}
 
 /*

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
 Wed Aug 31 16:06:09 2011
@@ -1277,6 +1277,13 @@ public class StorageService implements I
         MessagingService.instance().convict(endpoint);
     }
 
+    public void onRestart(InetAddress endpoint, EndpointState state)
+    {
+        // If we have restarted before the node was even marked down, we need 
to reset the connection pool
+        if (state.isAlive())
+            onDead(endpoint, state);
+    }
+
     /** raw load value */
     public double getLoad()
     {
@@ -1575,44 +1582,43 @@ public class StorageService implements I
         {
             return;
         }
-        
-        List<AntiEntropyService.RepairSession> sessions = new 
ArrayList<AntiEntropyService.RepairSession>();
+        List<AntiEntropyService.RepairFuture> futures = new 
ArrayList<AntiEntropyService.RepairFuture>();
         for (Range range : getLocalRanges(tableName))
         {
-            AntiEntropyService.RepairSession session = forceTableRepair(range, 
tableName, columnFamilies);
-            sessions.add(session);
+            AntiEntropyService.RepairFuture future = forceTableRepair(range, 
tableName, columnFamilies);
+            futures.add(future);
             // wait for a session to be done with its differencing before 
starting the next one
             try
             {
-                session.differencingDone.await();
+                future.session.differencingDone.await();
             }
             catch (InterruptedException e)
             {
-                logger_.error("Interrupted while waiting for the differencing 
of repair session " + session + " to be done. Repair may be imprecise.", e);
+                logger_.error("Interrupted while waiting for the differencing 
of repair session " + future.session + " to be done. Repair may be imprecise.", 
e);
             }
         }
 
         boolean failedSession = false;
 
         // block until all repair sessions have completed
-        for (AntiEntropyService.RepairSession sess : sessions)
+        for (AntiEntropyService.RepairFuture future : futures)
         {
             try
             {
-                sess.join();
+                future.get();
             }
-            catch (InterruptedException e)
+            catch (Exception e)
             {
-                logger_.error("Repair session " + sess + " failed.", e);
+                logger_.error("Repair session " + future.session + " failed.", 
e);
                 failedSession = true;
             }
         }
 
         if (failedSession)
-            throw new IOException("Some Repair session(s) failed.");
+            throw new IOException("Some repair session(s) failed (see log for 
details).");
     }
 
-    public AntiEntropyService.RepairSession forceTableRepair(final Range 
range, final String tableName, final String... columnFamilies) throws 
IOException
+    public AntiEntropyService.RepairFuture forceTableRepair(final Range range, 
final String tableName, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
@@ -1620,9 +1626,7 @@ public class StorageService implements I
             names.add(cfStore.getColumnFamilyName());
         }
 
-        AntiEntropyService.RepairSession sess = 
AntiEntropyService.instance.getRepairSession(range, tableName, 
names.toArray(new String[names.size()]));
-        sess.start();
-        return sess;
+        return AntiEntropyService.instance.submitRepairSession(range, 
tableName, names.toArray(new String[names.size()]));
     }
 
     /* End of MBean interface methods */

Modified: 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1163677&r1=1163676&r2=1163677&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 Wed Aug 31 16:06:09 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.*;
 
 import org.junit.After;
 import org.junit.Before;
@@ -47,6 +45,7 @@ import org.apache.cassandra.utils.Merkle
 import static org.apache.cassandra.service.AntiEntropyService.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class AntiEntropyServiceTestAbstract extends CleanupHelper
@@ -162,18 +161,21 @@ public abstract class AntiEntropyService
     @Test
     public void testManualRepair() throws Throwable
     {
-        AntiEntropyService.RepairSession sess = 
AntiEntropyService.instance.getRepairSession(local_range, tablename, cfname);
-        sess.start();
+        RepairFuture sess = 
AntiEntropyService.instance.submitRepairSession(local_range, tablename, cfname);
 
         // ensure that the session doesn't end without a response from REMOTE
-        sess.join(500);
-        assert sess.isAlive();
+        try
+        {
+            sess.get(500, TimeUnit.MILLISECONDS);
+            fail("Repair session should not end without response from REMOTE");
+        }
+        catch (TimeoutException e) {}
 
         // deliver a fake response from REMOTE
-        sess.completed(REMOTE, request.cf.right);
+        sess.session.completed(REMOTE, request.cf.right);
 
         // block until the repair has completed
-        sess.join();
+        sess.get();
     }
 
     @Test
@@ -218,8 +220,8 @@ public abstract class AntiEntropyService
     public void testDifferencer() throws Throwable
     {
         // this next part does some housekeeping so that cleanup in the 
differencer doesn't error out.
-        AntiEntropyService.RepairSession sess = 
AntiEntropyService.instance.getArtificialRepairSession(request, tablename, 
cfname);
-        
+        AntiEntropyService.RepairFuture sess = 
AntiEntropyService.instance.submitArtificialRepairSession(request, tablename, 
cfname);
+
         // generate a tree
         Validator validator = new Validator(request);
         validator.prepare(store);
@@ -242,7 +244,7 @@ public abstract class AntiEntropyService
         interesting.add(changed);
 
         // difference the trees
-        AntiEntropyService.RepairSession.Differencer diff = sess.new 
Differencer(cfname, request.endpoint, ltree, rtree);
+        AntiEntropyService.RepairSession.Differencer diff = sess.session.new 
Differencer(cfname, request.endpoint, ltree, rtree);
         diff.run();
         
         // ensure that the changed range was recorded


Reply via email to