Author: slebresne Date: Wed Aug 31 16:06:09 2011 New Revision: 1163677 URL: http://svn.apache.org/viewvc?rev=1163677&view=rev Log: Make repair report failure when a participating node dies patch by slebresne; reviewed by jbellis for CASSANDRA-2433
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Aug 31 16:06:09 2011 @@ -46,6 +46,8 @@ * Add validation that Keyspace names are case-insensitively unique (CASSANDRA-3066) * catch invalid key_validation_class before instantiating UpdateColumnFamily (CASSANDRA-3102) * make Range and Bounds objects client-safe (CASSANDRA-3108) + * make repair report failure when a node participating dies (instead of + hanging forever) (CASSANDRA-2433) 0.8.4 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/FailureDetector.java Wed Aug 31 16:06:09 2011 @@ -152,7 +152,7 @@ public class FailureDetector implements { for ( IFailureDetectionEventListener listener : fdEvntListeners_ ) { - listener.convict(ep); + listener.convict(ep, phi); } } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java Wed Aug 31 16:06:09 2011 @@ -244,7 +244,7 @@ public class Gossiper implements IFailur * * param @ endpoint end point that is convicted. */ - public void convict(InetAddress endpoint) + public void convict(InetAddress endpoint, double phi) { EndpointState epState = endpointStateMap.get(endpoint); if (epState.isAlive()) @@ -717,12 +717,11 @@ public class Gossiper implements IFailur if (logger.isTraceEnabled()) logger.trace("Adding endpoint state for " + ep); endpointStateMap.put(ep, epState); - if (epState.isAlive()) - { - // the node restarted before we ever marked it down, so we'll report it as dead briefly so maintenance like resetting the connection pool can occur - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onDead(ep, epState); - } + + // the node restarted: it is up to the subscriber to take whatever action is necessary + for (IEndpointStateChangeSubscriber subscriber : subscribers) + subscriber.onRestart(ep, epState); + if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value)) markAlive(ep, epState); else Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java Wed Aug 31 16:06:09 2011 @@ -47,4 +47,12 @@ public interface IEndpointStateChangeSub public void onDead(InetAddress endpoint, EndpointState state); public void onRemove(InetAddress endpoint); + + /** + * Called whenever a node is restarted. + * Note that there is no guarantee when that happens that the node was + * previously marked down. It will have only if {@code state.isAlive() == false} + * as {@code state} is from before the restarted node is marked up. + */ + public void onRestart(InetAddress endpoint, EndpointState state); } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java Wed Aug 31 16:06:09 2011 @@ -31,7 +31,7 @@ public interface IFailureDetectionEventL /** * Convict the specified endpoint. * @param ep endpoint to be convicted + * @param phi the value of phi with with ep was convicted */ - public void convict(InetAddress ep); - + public void convict(InetAddress ep, double phi); } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Aug 31 16:06:09 2011 @@ -23,27 +23,26 @@ import java.net.InetAddress; import java.security.MessageDigest; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import com.google.common.base.Objects; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.gms.Gossiper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.*; import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.CompactEndpointSerializationHelper; @@ -95,19 +94,19 @@ public class AntiEntropyService // singleton enforcement public static final AntiEntropyService instance = new AntiEntropyService(); - // timeout for outstanding requests (48 hours) - public final static long REQUEST_TIMEOUT = 48*60*60*1000; - - /** - * Map of outstanding sessions to requests. Once both trees reach the rendezvous, the local node - * will queue a Differencer to compare them. - * - * This map is only accessed from Stage.ANTIENTROPY, so it is not synchronized. - */ - private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests; + private static final ThreadPoolExecutor executor; + static + { + executor = new JMXConfigurableThreadPoolExecutor(4, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("AntiEntropySessions"), + "internal"); + } /** - * A map of repair session ids to a Queue of TreeRequests that have been performed since the session was started. + * A map of active session. */ private final ConcurrentMap<String, RepairSession> sessions; @@ -116,22 +115,24 @@ public class AntiEntropyService */ protected AntiEntropyService() { - requests = new ExpiringMap<String, Map<TreeRequest, TreePair>>(REQUEST_TIMEOUT); sessions = new ConcurrentHashMap<String, RepairSession>(); } /** * Requests repairs for the given table and column families, and blocks until all repairs have been completed. - * TODO: Should add retries: if nodes go offline before they respond to the requests, this could block forever. */ - public RepairSession getRepairSession(Range range, String tablename, String... cfnames) + public RepairFuture submitRepairSession(Range range, String tablename, String... cfnames) { - return new RepairSession(range, tablename, cfnames); + RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture(); + executor.execute(futureTask); + return futureTask; } - RepairSession getArtificialRepairSession(TreeRequest req, String tablename, String... cfnames) + RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String... cfnames) { - return new RepairSession(req, tablename, cfnames); + RepairFuture futureTask = new RepairSession(req, tablename, cfnames).getFuture(); + executor.execute(futureTask); + return futureTask; } /** @@ -591,24 +592,25 @@ public class AntiEntropyService * Triggers repairs with all neighbors for the given table, cfs and range. * Typical lifecycle is: start() then join(). Executed in client threads. */ - class RepairSession extends Thread + class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { + private final String sessionName; private final String tablename; private final String[] cfnames; - private final ConcurrentHashMap<TreeRequest,Object> requests = new ConcurrentHashMap<TreeRequest,Object>(); private final Range range; - private final Set<InetAddress> endpoints; + private volatile Exception exception; + private final AtomicBoolean isFailed = new AtomicBoolean(false); - private CountDownLatch completedLatch; + private final Set<InetAddress> endpoints; final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>(); + final Map<String, RepairJob> activeJobs = new ConcurrentHashMap<String, RepairJob>(); + private final SimpleCondition completed = new SimpleCondition(); public final Condition differencingDone = new SimpleCondition(); public RepairSession(TreeRequest req, String tablename, String... cfnames) { this(req.sessionid, req.range, tablename, cfnames); - requests.put(req, this); - completedLatch = new CountDownLatch(cfnames.length); AntiEntropyService.instance.sessions.put(getName(), this); } @@ -619,7 +621,7 @@ public class AntiEntropyService private RepairSession(String id, Range range, String tablename, String[] cfnames) { - super(id); + this.sessionName = id; this.tablename = tablename; this.cfnames = cfnames; assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; @@ -627,8 +629,18 @@ public class AntiEntropyService this.endpoints = AntiEntropyService.getNeighbors(tablename, range); } - @Override - public void run() + public String getName() + { + return sessionName; + } + + RepairFuture getFuture() + { + return new RepairFuture(this); + } + + // we don't care about the return value but care about it throwing exception + public void runMayThrow() throws Exception { if (endpoints.isEmpty()) { @@ -649,20 +661,25 @@ public class AntiEntropyService } AntiEntropyService.instance.sessions.put(getName(), this); + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); try { // Create and queue a RepairJob for each column family for (String cfname : cfnames) - jobs.offer(new RepairJob(cfname)); - - // We'll repair once by endpoints and column family - completedLatch = new CountDownLatch(endpoints.size() * cfnames.length); + { + RepairJob job = new RepairJob(cfname); + jobs.offer(job); + activeJobs.put(cfname, job); + } jobs.peek().sendTreeRequests(); // block whatever thread started this session until all requests have been returned: // if this thread dies, the session will still complete in the background - completedLatch.await(); + completed.await(); + if (exception != null) + throw exception; } catch (InterruptedException e) { @@ -670,6 +687,8 @@ public class AntiEntropyService } finally { + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + Gossiper.instance.unregister(this); AntiEntropyService.instance.sessions.remove(getName()); } } @@ -677,20 +696,69 @@ public class AntiEntropyService void completed(InetAddress remote, String cfname) { logger.debug("Repair completed for {} on {}", remote, cfname); - completedLatch.countDown(); + RepairJob job = activeJobs.get(cfname); + if (job.completedSynchronizationJob(remote)) + { + activeJobs.remove(cfname); + if (activeJobs.isEmpty()) + completed.signalAll(); + } + } + + void failedNode(InetAddress remote) + { + String errorMsg = String.format("Problem during repair session %s, endpoint %s died", sessionName, remote); + logger.error(errorMsg); + exception = new IOException(errorMsg); + // If a node failed, we stop everything (though there could still be some activity in the background) + jobs.clear(); + activeJobs.clear(); + differencingDone.signalAll(); + completed.signalAll(); + } + + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState epState) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void convict(InetAddress endpoint, double phi) + { + if (!endpoints.contains(endpoint)) + return; + + // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) + return; + + // Though unlikely, it is possible to arrive here multiple time and we + // want to avoid print an error message twice + if (!isFailed.compareAndSet(false, true)) + return; + + failedNode(endpoint); } class RepairJob { private final String cfname; - private final AtomicInteger remaining; - private final Map<InetAddress, MerkleTree> trees; + private final Set<InetAddress> requestedEndpoints = new HashSet<InetAddress>(); + private final Map<InetAddress, MerkleTree> trees = new HashMap<InetAddress, MerkleTree>(); + private final Set<InetAddress> syncJobs = new HashSet<InetAddress>(); public RepairJob(String cfname) { this.cfname = cfname; - this.remaining = new AtomicInteger(endpoints.size() + 1); // all neighbor + local host - this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>(); } /** @@ -698,22 +766,24 @@ public class AntiEntropyService */ public void sendTreeRequests() { - // send requests to remote nodes and record them - for (InetAddress endpoint : endpoints) - requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), RepairSession.this); - // send but don't record an outstanding request to the local node - AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname); + requestedEndpoints.addAll(endpoints); + requestedEndpoints.add(FBUtilities.getLocalAddress()); + + // send requests to all nodes + for (InetAddress endpoint : requestedEndpoints) + AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname); } /** * Add a new received tree and return the number of remaining tree to * be received for the job to be complete. */ - public int addTree(TreeRequest request, MerkleTree tree) + public synchronized int addTree(TreeRequest request, MerkleTree tree) { assert request.cf.right.equals(cfname); trees.put(request.endpoint, tree); - return remaining.decrementAndGet(); + requestedEndpoints.remove(request.endpoint); + return requestedEndpoints.size(); } /** @@ -722,7 +792,7 @@ public class AntiEntropyService */ public void submitDifferencers() { - assert remaining.get() == 0; + assert requestedEndpoints.size() == 0; // Right now, we only difference local host against each other. CASSANDRA-2610 will fix that. // In the meantime ugly special casing will work good enough. @@ -734,9 +804,17 @@ public class AntiEntropyService continue; Differencer differencer = new Differencer(cfname, entry.getKey(), entry.getValue(), localTree); + syncJobs.add(entry.getKey()); logger.debug("Queueing comparison " + differencer); StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer); } + trees.clear(); // allows gc to do its thing + } + + synchronized boolean completedSynchronizationJob(InetAddress remote) + { + syncJobs.remove(remote); + return syncJobs.isEmpty(); } } @@ -842,11 +920,21 @@ public class AntiEntropyService return; // all calls finished successfully - // completed(remote, cfname); - logger.info(String.format("Finished streaming repair with %s for %s: %d oustanding to complete session", remote, range, completedLatch.getCount())); + logger.info(String.format("Finished streaming repair with %s for %s", remote, range)); } } } } + + public static class RepairFuture extends FutureTask + { + public final RepairSession session; + + RepairFuture(RepairSession session) + { + super(session, null); + this.session = session; + } + } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java Wed Aug 31 16:06:09 2011 @@ -78,6 +78,8 @@ public class MigrationManager implements public void onDead(InetAddress endpoint, EndpointState state) { } + public void onRestart(InetAddress endpoint, EndpointState state) { } + public void onRemove(InetAddress endpoint) { } /** Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Aug 31 16:06:09 2011 @@ -197,6 +197,8 @@ public class StorageLoadBalancer impleme public void onDead(InetAddress endpoint, EndpointState state) {} + public void onRestart(InetAddress endpoint, EndpointState state) { } + public void onRemove(InetAddress endpoint) {} /* Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Wed Aug 31 16:06:09 2011 @@ -1277,6 +1277,13 @@ public class StorageService implements I MessagingService.instance().convict(endpoint); } + public void onRestart(InetAddress endpoint, EndpointState state) + { + // If we have restarted before the node was even marked down, we need to reset the connection pool + if (state.isAlive()) + onDead(endpoint, state); + } + /** raw load value */ public double getLoad() { @@ -1575,44 +1582,43 @@ public class StorageService implements I { return; } - - List<AntiEntropyService.RepairSession> sessions = new ArrayList<AntiEntropyService.RepairSession>(); + List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(); for (Range range : getLocalRanges(tableName)) { - AntiEntropyService.RepairSession session = forceTableRepair(range, tableName, columnFamilies); - sessions.add(session); + AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, columnFamilies); + futures.add(future); // wait for a session to be done with its differencing before starting the next one try { - session.differencingDone.await(); + future.session.differencingDone.await(); } catch (InterruptedException e) { - logger_.error("Interrupted while waiting for the differencing of repair session " + session + " to be done. Repair may be imprecise.", e); + logger_.error("Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.", e); } } boolean failedSession = false; // block until all repair sessions have completed - for (AntiEntropyService.RepairSession sess : sessions) + for (AntiEntropyService.RepairFuture future : futures) { try { - sess.join(); + future.get(); } - catch (InterruptedException e) + catch (Exception e) { - logger_.error("Repair session " + sess + " failed.", e); + logger_.error("Repair session " + future.session + " failed.", e); failedSession = true; } } if (failedSession) - throw new IOException("Some Repair session(s) failed."); + throw new IOException("Some repair session(s) failed (see log for details)."); } - public AntiEntropyService.RepairSession forceTableRepair(final Range range, final String tableName, final String... columnFamilies) throws IOException + public AntiEntropyService.RepairFuture forceTableRepair(final Range range, final String tableName, final String... columnFamilies) throws IOException { ArrayList<String> names = new ArrayList<String>(); for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) @@ -1620,9 +1626,7 @@ public class StorageService implements I names.add(cfStore.getColumnFamilyName()); } - AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(range, tableName, names.toArray(new String[names.size()])); - sess.start(); - return sess; + return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new String[names.size()])); } /* End of MBean interface methods */ Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1163677&r1=1163676&r2=1163677&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Wed Aug 31 16:06:09 2011 @@ -20,9 +20,7 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; import org.junit.After; import org.junit.Before; @@ -47,6 +45,7 @@ import org.apache.cassandra.utils.Merkle import static org.apache.cassandra.service.AntiEntropyService.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.cassandra.utils.ByteBufferUtil; public abstract class AntiEntropyServiceTestAbstract extends CleanupHelper @@ -162,18 +161,21 @@ public abstract class AntiEntropyService @Test public void testManualRepair() throws Throwable { - AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(local_range, tablename, cfname); - sess.start(); + RepairFuture sess = AntiEntropyService.instance.submitRepairSession(local_range, tablename, cfname); // ensure that the session doesn't end without a response from REMOTE - sess.join(500); - assert sess.isAlive(); + try + { + sess.get(500, TimeUnit.MILLISECONDS); + fail("Repair session should not end without response from REMOTE"); + } + catch (TimeoutException e) {} // deliver a fake response from REMOTE - sess.completed(REMOTE, request.cf.right); + sess.session.completed(REMOTE, request.cf.right); // block until the repair has completed - sess.join(); + sess.get(); } @Test @@ -218,8 +220,8 @@ public abstract class AntiEntropyService public void testDifferencer() throws Throwable { // this next part does some housekeeping so that cleanup in the differencer doesn't error out. - AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getArtificialRepairSession(request, tablename, cfname); - + AntiEntropyService.RepairFuture sess = AntiEntropyService.instance.submitArtificialRepairSession(request, tablename, cfname); + // generate a tree Validator validator = new Validator(request); validator.prepare(store); @@ -242,7 +244,7 @@ public abstract class AntiEntropyService interesting.add(changed); // difference the trees - AntiEntropyService.RepairSession.Differencer diff = sess.new Differencer(cfname, request.endpoint, ltree, rtree); + AntiEntropyService.RepairSession.Differencer diff = sess.session.new Differencer(cfname, request.endpoint, ltree, rtree); diff.run(); // ensure that the changed range was recorded