http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
new file mode 100644
index 0000000..8f53781
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Tests CompactionStrategyManager's handling of pending repair sstables
+ */
+public class CompactionStrategyManagerPendingRepairTest extends 
AbstractPendingRepairTest
+{
+
+    private static boolean 
strategiesContain(Collection<AbstractCompactionStrategy> strategies, 
SSTableReader sstable)
+    {
+        return Iterables.any(strategies, strategy -> 
strategy.getSSTables().contains(sstable));
+    }
+
+    private boolean pendingContains(UUID id, SSTableReader sstable)
+    {
+        return Iterables.any(csm.getPendingRepairManagers(), p -> p.get(id) != 
null && p.get(id).getSSTables().contains(sstable));
+    }
+
+    private boolean pendingContains(SSTableReader sstable)
+    {
+        return Iterables.any(csm.getPendingRepairManagers(), p -> 
strategiesContain(p.getStrategies(), sstable));
+    }
+
+    private boolean repairedContains(SSTableReader sstable)
+    {
+        return strategiesContain(csm.getRepaired(), sstable);
+    }
+
+    private boolean unrepairedContains(SSTableReader sstable)
+    {
+        return strategiesContain(csm.getUnrepaired(), sstable);
+    }
+
+    /**
+     * Pending repair strategy should be created when we encounter a new 
pending id
+     */
+    @Test
+    public void sstableAdded()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        Assert.assertTrue(csm.pendingRepairs().isEmpty());
+
+        SSTableReader sstable = makeSSTable(true);
+        Assert.assertFalse(sstable.isRepaired());
+        Assert.assertFalse(sstable.isPendingRepair());
+
+        mutateRepaired(sstable, repairID);
+        Assert.assertFalse(sstable.isRepaired());
+        Assert.assertTrue(sstable.isPendingRepair());
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+
+        // add the sstable
+        csm.handleNotification(new 
SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        Assert.assertFalse(repairedContains(sstable));
+        Assert.assertFalse(unrepairedContains(sstable));
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
+        Assert.assertTrue(pendingContains(repairID, sstable));
+    }
+
+    @Test
+    public void sstableListChangedAddAndRemove()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+
+        SSTableReader sstable1 = makeSSTable(true);
+        mutateRepaired(sstable1, repairID);
+
+        SSTableReader sstable2 = makeSSTable(true);
+        mutateRepaired(sstable2, repairID);
+
+        Assert.assertFalse(repairedContains(sstable1));
+        Assert.assertFalse(unrepairedContains(sstable1));
+        Assert.assertFalse(repairedContains(sstable2));
+        Assert.assertFalse(unrepairedContains(sstable2));
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+
+        // add only
+        SSTableListChangedNotification notification;
+        notification = new 
SSTableListChangedNotification(Collections.singleton(sstable1),
+                                                          
Collections.emptyList(),
+                                                          
OperationType.COMPACTION);
+        csm.handleNotification(notification, cfs.getTracker());
+
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
+        Assert.assertFalse(repairedContains(sstable1));
+        Assert.assertFalse(unrepairedContains(sstable1));
+        Assert.assertTrue(pendingContains(repairID, sstable1));
+        Assert.assertFalse(repairedContains(sstable2));
+        Assert.assertFalse(unrepairedContains(sstable2));
+        Assert.assertFalse(pendingContains(repairID, sstable2));
+
+        // remove and add
+        notification = new 
SSTableListChangedNotification(Collections.singleton(sstable2),
+                                                          
Collections.singleton(sstable1),
+                                                          
OperationType.COMPACTION);
+        csm.handleNotification(notification, cfs.getTracker());
+
+        Assert.assertFalse(repairedContains(sstable1));
+        Assert.assertFalse(unrepairedContains(sstable1));
+        Assert.assertFalse(pendingContains(repairID, sstable1));
+        Assert.assertFalse(repairedContains(sstable2));
+        Assert.assertFalse(unrepairedContains(sstable2));
+        Assert.assertTrue(pendingContains(repairID, sstable2));
+    }
+
+    @Test
+    public void sstableRepairStatusChanged()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+
+        // add as unrepaired
+        SSTableReader sstable = makeSSTable(false);
+        Assert.assertTrue(unrepairedContains(sstable));
+        Assert.assertFalse(repairedContains(sstable));
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+
+        SSTableRepairStatusChanged notification;
+
+        // change to pending repaired
+        mutateRepaired(sstable, repairID);
+        notification = new 
SSTableRepairStatusChanged(Collections.singleton(sstable));
+        csm.handleNotification(notification, cfs.getTracker());
+        Assert.assertFalse(unrepairedContains(sstable));
+        Assert.assertFalse(repairedContains(sstable));
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
+        Assert.assertTrue(pendingContains(repairID, sstable));
+
+        // change to repaired
+        mutateRepaired(sstable, System.currentTimeMillis());
+        notification = new 
SSTableRepairStatusChanged(Collections.singleton(sstable));
+        csm.handleNotification(notification, cfs.getTracker());
+        Assert.assertFalse(unrepairedContains(sstable));
+        Assert.assertTrue(repairedContains(sstable));
+        Assert.assertFalse(pendingContains(repairID, sstable));
+    }
+
+    @Test
+    public void sstableDeleted()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        csm.handleNotification(new 
SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        Assert.assertTrue(pendingContains(repairID, sstable));
+
+        // delete sstable
+        SSTableDeletingNotification notification = new 
SSTableDeletingNotification(sstable);
+        csm.handleNotification(notification, cfs.getTracker());
+        Assert.assertFalse(pendingContains(repairID, sstable));
+        Assert.assertFalse(unrepairedContains(sstable));
+        Assert.assertFalse(repairedContains(sstable));
+    }
+
+    /**
+     * CompactionStrategyManager.getStrategies should include
+     * pending repair strategies when appropriate
+     */
+    @Test
+    public void getStrategies()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+
+        List<List<AbstractCompactionStrategy>> strategies;
+
+        strategies = csm.getStrategies();
+        Assert.assertEquals(3, strategies.size());
+        Assert.assertTrue(strategies.get(2).isEmpty());
+
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        csm.handleNotification(new 
SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+
+        strategies = csm.getStrategies();
+        Assert.assertEquals(3, strategies.size());
+        Assert.assertFalse(strategies.get(2).isEmpty());
+    }
+
+    /**
+     * Tests that finalized repairs result in cleanup compaction tasks
+     * which reclassify the sstables as repaired
+     */
+    @Test
+    public void cleanupCompactionFinalized()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        csm.handleNotification(new 
SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        LocalSessionAccessor.finalizeUnsafe(repairID);
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
+        Assert.assertNotNull(pendingContains(repairID, sstable));
+        Assert.assertTrue(sstable.isPendingRepair());
+        Assert.assertFalse(sstable.isRepaired());
+
+        AbstractCompactionTask compactionTask = 
csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
+        Assert.assertNotNull(compactionTask);
+        
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, 
compactionTask.getClass());
+
+        // run the compaction
+        compactionTask.execute(null);
+
+        Assert.assertTrue(repairedContains(sstable));
+        Assert.assertFalse(unrepairedContains(sstable));
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+
+        // sstable should have pendingRepair cleared, and repairedAt set 
correctly
+        long expectedRepairedAt = 
ActiveRepairService.instance.getParentRepairSession(repairID).getRepairedAt();
+        Assert.assertFalse(sstable.isPendingRepair());
+        Assert.assertTrue(sstable.isRepaired());
+        Assert.assertEquals(expectedRepairedAt, 
sstable.getSSTableMetadata().repairedAt);
+    }
+
+    /**
+     * Tests that failed repairs result in cleanup compaction tasks
+     * which reclassify the sstables as unrepaired
+     */
+    @Test
+    public void cleanupCompactionFailed()
+    {
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        csm.handleNotification(new 
SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        LocalSessionAccessor.failUnsafe(repairID);
+
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
+        Assert.assertNotNull(pendingContains(repairID, sstable));
+        Assert.assertTrue(sstable.isPendingRepair());
+        Assert.assertFalse(sstable.isRepaired());
+
+        AbstractCompactionTask compactionTask = 
csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
+        Assert.assertNotNull(compactionTask);
+        
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, 
compactionTask.getClass());
+
+        // run the compaction
+        compactionTask.execute(null);
+
+        Assert.assertFalse(repairedContains(sstable));
+        Assert.assertTrue(unrepairedContains(sstable));
+        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+
+        // sstable should have pendingRepair cleared, and repairedAt set 
correctly
+        Assert.assertFalse(sstable.isPendingRepair());
+        Assert.assertFalse(sstable.isRepaired());
+        Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, 
sstable.getSSTableMetadata().repairedAt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 9a8371e..ce65f4e 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -189,7 +189,7 @@ public class LeveledCompactionStrategyTest
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = 
keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
         UUID parentRepSession = UUID.randomUUID();
-        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), 
false, System.currentTimeMillis(), true);
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), 
false, ActiveRepairService.UNREPAIRED_SSTABLE, true);
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, 
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
         Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();
@@ -350,7 +350,7 @@ public class LeveledCompactionStrategyTest
         SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
         SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
 
-        
sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor,
 System.currentTimeMillis());
+        
sstable1.descriptor.getMetadataSerializer().mutateRepaired(sstable1.descriptor, 
System.currentTimeMillis(), null);
         sstable1.reloadSSTableMetadata();
         assertTrue(sstable1.isRepaired());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java 
b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
new file mode 100644
index 0000000..1c2f02e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class PendingRepairManagerTest extends AbstractPendingRepairTest
+{
+    /**
+     * If a local session is ongoing, it should not be cleaned up
+     */
+    @Test
+    public void needsCleanupInProgress()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        Assert.assertNotNull(prm.get(repairID));
+
+        Assert.assertFalse(prm.canCleanup(repairID));
+    }
+
+    /**
+     * If a local session is finalized, it should be cleaned up
+     */
+    @Test
+    public void needsCleanupFinalized()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        Assert.assertNotNull(prm.get(repairID));
+        LocalSessionAccessor.finalizeUnsafe(repairID);
+
+        Assert.assertTrue(prm.canCleanup(repairID));
+    }
+
+    /**
+     * If a local session has failed, it should be cleaned up
+     */
+    @Test
+    public void needsCleanupFailed()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        Assert.assertNotNull(prm.get(repairID));
+        LocalSessionAccessor.failUnsafe(repairID);
+
+        Assert.assertTrue(prm.canCleanup(repairID));
+    }
+
+    @Test
+    public void needsCleanupNoSession()
+    {
+        UUID fakeID = UUIDGen.getTimeUUID();
+        PendingRepairManager prm = new PendingRepairManager(cfs, null);
+        Assert.assertTrue(prm.canCleanup(fakeID));
+    }
+
+    @Test
+    public void estimateRemainingTasksInProgress()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        Assert.assertNotNull(prm.get(repairID));
+
+        Assert.assertEquals(0, prm.getEstimatedRemainingTasks());
+        Assert.assertEquals(0, prm.getNumPendingRepairFinishedTasks());
+    }
+
+    @Test
+    public void estimateRemainingFinishedRepairTasks()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        Assert.assertNotNull(prm.get(repairID));
+        Assert.assertNotNull(prm.get(repairID));
+        LocalSessionAccessor.finalizeUnsafe(repairID);
+
+        Assert.assertEquals(0, prm.getEstimatedRemainingTasks());
+        Assert.assertEquals(1, prm.getNumPendingRepairFinishedTasks());
+    }
+
+    @Test
+    public void getNextBackgroundTask()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+
+        repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        LocalSessionAccessor.finalizeUnsafe(repairID);
+
+        Assert.assertEquals(2, prm.getSessions().size());
+        
Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds()));
+        AbstractCompactionTask compactionTask = 
prm.getNextRepairFinishedTask();
+        Assert.assertNotNull(compactionTask);
+        
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, 
compactionTask.getClass());
+        PendingRepairManager.RepairFinishedCompactionTask cleanupTask = 
(PendingRepairManager.RepairFinishedCompactionTask) compactionTask;
+        Assert.assertEquals(repairID, cleanupTask.getSessionID());
+    }
+
+    @Test
+    public void getNextBackgroundTaskNoSessions()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+        
Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds()));
+    }
+
+    @Test
+    public void maximalTaskNeedsCleanup()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+        UUID repairID = registerSession(cfs);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID);
+        prm.addSSTable(sstable);
+        Assert.assertNotNull(prm.get(repairID));
+        Assert.assertNotNull(prm.get(repairID));
+        LocalSessionAccessor.finalizeUnsafe(repairID);
+
+        Assert.assertEquals(1, prm.getMaximalTasks(FBUtilities.nowInSeconds(), 
false).size());
+    }
+
+    @Test
+    public void userDefinedTaskTest()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+        UUID repairId = registerSession(cfs);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairId);
+        prm.addSSTable(sstable);
+        List<AbstractCompactionTask> tasks = 
csm.getUserDefinedTasks(Collections.singleton(sstable), 100);
+        Assert.assertEquals(1, tasks.size());
+    }
+
+    @Test
+    public void mixedPendingSessionsTest()
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+        UUID repairId = registerSession(cfs);
+        UUID repairId2 = registerSession(cfs);
+        SSTableReader sstable = makeSSTable(true);
+        SSTableReader sstable2 = makeSSTable(true);
+
+        mutateRepaired(sstable, repairId);
+        mutateRepaired(sstable2, repairId2);
+        prm.addSSTable(sstable);
+        prm.addSSTable(sstable2);
+        List<AbstractCompactionTask> tasks = 
csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100);
+        Assert.assertEquals(2, tasks.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 2021538..536bdca 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -1183,7 +1183,7 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
         SerializationHeader header = SerializationHeader.make(cfs.metadata(), 
Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata().comparator)
-                                                 
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, -1, header)
+                                                 
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, -1, null, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor,
                                                           components,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index f4443f1..bb8cc68 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -163,6 +163,7 @@ public class RealTransactionsTest extends SchemaLoader
                                                            desc,
                                                            0,
                                                            0,
+                                                           null,
                                                            0,
                                                            
SerializationHeader.make(cfs.metadata(), txn.originals()),
                                                            
cfs.indexManager.listIndexes(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java 
b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index fd9b03e..cebceca 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -50,7 +50,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), 
factory.fromString("100"));
 
         InetAddress local = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false);
+        StreamSession session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false, null);
         session.addStreamRequest("keyspace1", Collections.singleton(range), 
Collections.singleton("cf"), 0);
 
         StreamStateStore store = new StreamStateStore();
@@ -71,7 +71,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), 
factory.fromString("200"));
-        session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false);
+        session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false, null);
         session.addStreamRequest("keyspace1", Collections.singleton(range2), 
Collections.singleton("cf"), 0);
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 7e8c1fb..666c2e9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends 
AbstractTransactionalTest
 
         private TestableBTW(Descriptor desc)
         {
-            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, new 
SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)));
+            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, new 
SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(Descriptor desc, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index e3afaeb..41a6828 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -936,7 +936,7 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
             File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             Descriptor desc = cfs.newSSTableDescriptor(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 
0, 0, new SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 
0, 0, null, new SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * 
partitionCount) / fileCount;
                 for ( ; i < end ; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 189782c..d3eef01 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -30,9 +30,10 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
 
 import org.apache.cassandra.Util;
+
+import static org.apache.cassandra.service.ActiveRepairService.*;
 import static org.junit.Assert.assertEquals;
 
 public class SSTableUtils
@@ -217,7 +218,7 @@ public class SSTableUtils
             TableMetadata metadata = Schema.instance.getTableMetadata(ksname, 
cfname);
             ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(metadata.id);
             SerializationHeader header = appender.header();
-            SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, 
Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, 
ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
+            SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, 
Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, 
UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, 0, header);
             while (appender.append(writer)) { /* pass */ }
             Collection<SSTableReader> readers = writer.finish(true);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index cc92b2c..d42c49b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -164,7 +164,7 @@ public class SSTableWriterTestBase extends SchemaLoader
     public static SSTableWriter getWriter(ColumnFamilyStore cfs, File 
directory, LifecycleTransaction txn)
     {
         Descriptor desc = cfs.newSSTableDescriptor(directory);
-        return SSTableWriter.create(desc, 0, 0, new SerializationHeader(true, 
cfs.metadata(), cfs.metadata().regularAndStaticColumns(), 
EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+        return SSTableWriter.create(desc, 0, 0, null, new 
SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), 
cfs.indexManager.listIndexes(), txn);
     }
 
     public static ByteBuffer random(int i, int size)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
index 18defdf..18bb6b1 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
@@ -96,7 +96,7 @@ public class SSTableFlushObserverTest
                                                                   KS_NAME, 
CF_NAME,
                                                                   0,
                                                                   
sstableFormat),
-                                                   10L, 0L, 
TableMetadataRef.forOfflineTools(cfm),
+                                                   10L, 0L, null, 
TableMetadataRef.forOfflineTools(cfm),
                                                    new 
MetadataCollector(cfm.comparator).sstableLevel(0),
                                                    new 
SerializationHeader(true, cfm, cfm.regularAndStaticColumns(), 
EncodingStats.NO_STATS),
                                                    
Collections.singletonList(observer),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index b03f275..b918dfd 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -44,6 +44,8 @@ import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class MetadataSerializerTest
 {
@@ -96,7 +98,7 @@ public class MetadataSerializerTest
 
         String partitioner = RandomPartitioner.class.getCanonicalName();
         double bfFpChance = 0.1;
-        Map<MetadataType, MetadataComponent> originalMetadata = 
collector.finalizeMetadata(partitioner, bfFpChance, 0, 
SerializationHeader.make(cfm, Collections.emptyList()));
+        Map<MetadataType, MetadataComponent> originalMetadata = 
collector.finalizeMetadata(partitioner, bfFpChance, 0, null, 
SerializationHeader.make(cfm, Collections.emptyList()));
         return originalMetadata;
     }
 
@@ -118,6 +120,12 @@ public class MetadataSerializerTest
         testOldReadsNew("mb", "mc");
     }
 
+    @Test
+    public void testMdReadMc() throws IOException
+    {
+        testOldReadsNew("mc", "md");
+    }
+
     public void testOldReadsNew(String oldV, String newV) throws IOException
     {
         Map<MetadataType, MetadataComponent> originalMetadata = 
constructMetadata();
@@ -146,4 +154,13 @@ public class MetadataSerializerTest
             }
         }
     }
+
+    @Test
+    public void pendingRepairCompatibility()
+    {
+        Version mc = BigFormat.instance.getVersion("mc");
+        assertFalse(mc.hasPendingRepair());
+        Version md = BigFormat.instance.getVersion("md");
+        assertTrue(md.hasPendingRepair());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 0fceaf4..b799d66 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -76,7 +76,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(ep1, tree1);
         TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
         task.run();
 
         assertEquals(0, task.get().numberOfDifferences);
@@ -90,7 +90,9 @@ public class LocalSyncTaskTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession,  
FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), 
false, System.currentTimeMillis(), false);
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession,  
FBUtilities.getBroadcastAddress(),
+                                                                 
Arrays.asList(cfs), Arrays.asList(range), false,
+                                                                 
ActiveRepairService.UNREPAIRED_SSTABLE, false);
 
         RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
 
@@ -111,7 +113,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), 
tree1);
         TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), 
tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
         task.run();
 
         // ensure that the changed range was recorded

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java 
b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index f65bedb..7be8cb5 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -62,7 +62,7 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new 
Range<>(p.getToken(ByteBufferUtil.bytes(0)), 
p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddress> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, 
Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, 
endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, "Standard1");
+        RepairSession session = new RepairSession(parentSessionId, sessionId, 
Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, 
endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, false, "Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java 
b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index e2fb2c4..bbcdbb8 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -193,7 +193,7 @@ public class ValidatorTest
                                                                  false);
 
         final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
-        Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), 0, true);
+        Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), 0, true, false);
         CompactionManager.instance.submitValidation(cfs, validator);
 
         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
 
b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
new file mode 100644
index 0000000..26168ad
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Ignore;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+@Ignore
+public abstract class AbstractConsistentSessionTest
+{
+    protected static final InetAddress COORDINATOR;
+    protected static final InetAddress PARTICIPANT1;
+    protected static final InetAddress PARTICIPANT2;
+    protected static final InetAddress PARTICIPANT3;
+
+    static
+    {
+        try
+        {
+            COORDINATOR = InetAddress.getByName("10.0.0.1");
+            PARTICIPANT1 = InetAddress.getByName("10.0.0.1");
+            PARTICIPANT2 = InetAddress.getByName("10.0.0.2");
+            PARTICIPANT3 = InetAddress.getByName("10.0.0.3");
+        }
+        catch (UnknownHostException e)
+        {
+
+            throw new AssertionError(e);
+        }
+
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    protected static final Set<InetAddress> PARTICIPANTS = 
ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
+
+    protected static Token t(int v)
+    {
+        return 
DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v));
+    }
+
+    protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2));
+    protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3));
+    protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5));
+
+
+    protected static UUID registerSession(ColumnFamilyStore cfs)
+    {
+        UUID sessionId = UUIDGen.getTimeUUID();
+
+        ActiveRepairService.instance.registerParentRepairSession(sessionId,
+                                                                 COORDINATOR,
+                                                                 
Lists.newArrayList(cfs),
+                                                                 
Sets.newHashSet(RANGE1, RANGE2, RANGE3),
+                                                                 true,
+                                                                 
System.currentTimeMillis(),
+                                                                 true);
+        return sessionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
new file mode 100644
index 0000000..b570920
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.repair.RepairSessionResult;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+
+public class CoordinatorSessionTest extends AbstractConsistentSessionTest
+{
+
+    static CoordinatorSession.Builder createBuilder()
+    {
+        CoordinatorSession.Builder builder = CoordinatorSession.builder();
+        builder.withState(PREPARING);
+        builder.withSessionID(UUIDGen.getTimeUUID());
+        builder.withCoordinator(COORDINATOR);
+        builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), 
UUIDGen.getTimeUUID()));
+        builder.withRepairedAt(System.currentTimeMillis());
+        builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3));
+        builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3));
+        return builder;
+    }
+
+    static CoordinatorSession createSession()
+    {
+        return createBuilder().build();
+    }
+
+    static InstrumentedCoordinatorSession createInstrumentedSession()
+    {
+        return new InstrumentedCoordinatorSession(createBuilder());
+    }
+
+    private static RepairSessionResult createResult(CoordinatorSession 
coordinator)
+    {
+        return new RepairSessionResult(coordinator.sessionID, "ks", 
coordinator.ranges, null);
+    }
+
+    private static void assertMessageSent(InstrumentedCoordinatorSession 
coordinator, InetAddress participant, RepairMessage expected)
+    {
+        Assert.assertTrue(coordinator.sentMessages.containsKey(participant));
+        Assert.assertEquals(1, 
coordinator.sentMessages.get(participant).size());
+        Assert.assertEquals(expected, 
coordinator.sentMessages.get(participant).get(0));
+    }
+
+    private static class InstrumentedCoordinatorSession extends 
CoordinatorSession
+    {
+        public InstrumentedCoordinatorSession(Builder builder)
+        {
+            super(builder);
+        }
+
+        Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>();
+
+        protected void sendMessage(InetAddress destination, RepairMessage 
message)
+        {
+            if (!sentMessages.containsKey(destination))
+            {
+                sentMessages.put(destination, new ArrayList<>());
+            }
+            sentMessages.get(destination).add(message);
+        }
+
+        Runnable onSetRepairing = null;
+        boolean setRepairingCalled = false;
+        public synchronized void setRepairing()
+        {
+            setRepairingCalled = true;
+            if (onSetRepairing != null)
+            {
+                onSetRepairing.run();
+            }
+            super.setRepairing();
+        }
+
+        Runnable onFinalizeCommit = null;
+        boolean finalizeCommitCalled = false;
+        public synchronized void finalizeCommit(Executor executor)
+        {
+            finalizeCommitCalled = true;
+            if (onFinalizeCommit != null)
+            {
+                onFinalizeCommit.run();
+            }
+            super.finalizeCommit(executor);
+        }
+
+        Runnable onFail = null;
+        boolean failCalled = false;
+        public synchronized void fail(Executor executor)
+        {
+            failCalled = true;
+            if (onFail != null)
+            {
+                onFail.run();
+            }
+            super.fail(executor);
+        }
+    }
+
+    /**
+     * Coordinator state should only switch after all participants are set
+     */
+    @Test
+    public void setPeerState()
+    {
+        CoordinatorSession session = createSession();
+        Assert.assertEquals(PREPARING, session.getState());
+
+        session.setParticipantState(PARTICIPANT1, PREPARED);
+        Assert.assertEquals(PREPARING, session.getState());
+
+        session.setParticipantState(PARTICIPANT2, PREPARED);
+        Assert.assertEquals(PREPARING, session.getState());
+
+        session.setParticipantState(PARTICIPANT3, PREPARED);
+        Assert.assertEquals(PREPARED, session.getState());
+    }
+
+    @Test
+    public void hasFailed()
+    {
+        CoordinatorSession session;
+
+        // participant failure
+        session = createSession();
+        Assert.assertFalse(session.hasFailed());
+        session.setParticipantState(PARTICIPANT1, FAILED);
+        Assert.assertTrue(session.hasFailed());
+
+        // coordinator failure
+        session = createSession();
+        Assert.assertFalse(session.hasFailed());
+        session.setState(FAILED);
+        Assert.assertTrue(session.hasFailed());
+    }
+
+    /**
+     * Coordinator should only send out failures messages once
+     */
+    @Test
+    public void multipleFailures()
+    {
+        InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
+
+        Assert.assertEquals(PREPARING, coordinator.getState());
+        Assert.assertTrue(coordinator.sentMessages.isEmpty());
+
+        coordinator.fail();
+        Assert.assertEquals(FAILED, coordinator.getState());
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            assertMessageSent(coordinator, participant, new 
FailSession(coordinator.sessionID));
+        }
+
+        coordinator.sentMessages.clear();
+        coordinator.fail();
+        Assert.assertEquals(FAILED, coordinator.getState());
+        Assert.assertTrue(coordinator.sentMessages.isEmpty());
+    }
+
+    /**
+     * Tests the complete coordinator side consistent repair cycle
+     */
+    @Test
+    public void successCase()
+    {
+        InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
+        Executor executor = MoreExecutors.directExecutor();
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean hasFailures = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+        Assert.assertTrue(coordinator.sentMessages.isEmpty());
+        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+
+        for (InetAddress participant : PARTICIPANTS)
+        {
+
+            RepairMessage expected = new 
PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new 
HashSet<>(PARTICIPANTS));
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        // participants respond to coordinator, and repair begins once all 
participants have responded with success
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT2, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        // set the setRepairing callback to verify the correct state when it's 
called
+        Assert.assertFalse(coordinator.setRepairingCalled);
+        coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, 
coordinator.getState());
+        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        Assert.assertTrue(coordinator.setRepairingCalled);
+        Assert.assertTrue(repairSubmitted.get());
+
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        ArrayList<RepairSessionResult> results = 
Lists.newArrayList(createResult(coordinator),
+                                                                    
createResult(coordinator),
+                                                                    
createResult(coordinator));
+
+        coordinator.sentMessages.clear();
+        repairFuture.set(results);
+
+        // propose messages should have been sent once all repair sessions 
completed successfully
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            RepairMessage expected = new 
FinalizePropose(coordinator.sessionID);
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        // finalize commit messages will be sent once all participants respond 
with a promize to finalize
+        coordinator.sentMessages.clear();
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        coordinator.handleFinalizePromise(PARTICIPANT1, true);
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        coordinator.handleFinalizePromise(PARTICIPANT2, true);
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        // set the finalizeCommit callback so we can verify the state when 
it's called
+        Assert.assertFalse(coordinator.finalizeCommitCalled);
+        coordinator.onFinalizeCommit = () -> 
Assert.assertEquals(FINALIZE_PROMISED, coordinator.getState());
+        coordinator.handleFinalizePromise(PARTICIPANT3, true);
+        Assert.assertTrue(coordinator.finalizeCommitCalled);
+
+        Assert.assertEquals(ConsistentSession.State.FINALIZED, 
coordinator.getState());
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            RepairMessage expected = new FinalizeCommit(coordinator.sessionID);
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        Assert.assertTrue(sessionResult.isDone());
+        Assert.assertFalse(hasFailures.get());
+    }
+
+    @Test
+    public void failedRepairs()
+    {
+        InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
+        Executor executor = MoreExecutors.directExecutor();
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean hasFailures = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+        Assert.assertTrue(coordinator.sentMessages.isEmpty());
+        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            PrepareConsistentRequest expected = new 
PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new 
HashSet<>(PARTICIPANTS));
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        // participants respond to coordinator, and repair begins once all 
participants have responded with success
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT2, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        // set the setRepairing callback to verify the correct state when it's 
called
+        Assert.assertFalse(coordinator.setRepairingCalled);
+        coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, 
coordinator.getState());
+        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        Assert.assertTrue(coordinator.setRepairingCalled);
+        Assert.assertTrue(repairSubmitted.get());
+
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        ArrayList<RepairSessionResult> results = 
Lists.newArrayList(createResult(coordinator),
+                                                                    null,
+                                                                    
createResult(coordinator));
+
+        coordinator.sentMessages.clear();
+        Assert.assertFalse(coordinator.failCalled);
+        coordinator.onFail = () -> Assert.assertEquals(REPAIRING, 
coordinator.getState());
+        repairFuture.set(results);
+        Assert.assertTrue(coordinator.failCalled);
+
+        // all participants should have been notified of session failure
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            RepairMessage expected = new FailSession(coordinator.sessionID);
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        Assert.assertTrue(sessionResult.isDone());
+        Assert.assertTrue(hasFailures.get());
+    }
+
+    @Test
+    public void failedPrepare()
+    {
+        InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
+        Executor executor = MoreExecutors.directExecutor();
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean hasFailures = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+        Assert.assertTrue(coordinator.sentMessages.isEmpty());
+        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            PrepareConsistentRequest expected = new 
PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new 
HashSet<>(PARTICIPANTS));
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        coordinator.sentMessages.clear();
+
+        // participants respond to coordinator, and repair begins once all 
participants have responded with success
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        // participant 2 fails to prepare for consistent repair
+        Assert.assertFalse(coordinator.failCalled);
+        coordinator.handlePrepareResponse(PARTICIPANT2, false);
+        Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
+        Assert.assertTrue(coordinator.failCalled);
+
+        // additional success messages should be ignored
+        Assert.assertFalse(coordinator.setRepairingCalled);
+        coordinator.onSetRepairing = Assert::fail;
+        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        Assert.assertFalse(coordinator.setRepairingCalled);
+        Assert.assertFalse(repairSubmitted.get());
+
+        // all participants should have been notified of session failure
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            RepairMessage expected = new FailSession(coordinator.sessionID);
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        Assert.assertTrue(sessionResult.isDone());
+        Assert.assertTrue(hasFailures.get());
+    }
+
+    @Test
+    public void failedPropose()
+    {
+        InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
+        Executor executor = MoreExecutors.directExecutor();
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean hasFailures = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+        Assert.assertTrue(coordinator.sentMessages.isEmpty());
+        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+
+        for (InetAddress participant : PARTICIPANTS)
+        {
+
+            RepairMessage expected = new 
PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new 
HashSet<>(PARTICIPANTS));
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        // participants respond to coordinator, and repair begins once all 
participants have responded with success
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        coordinator.handlePrepareResponse(PARTICIPANT2, true);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+
+        // set the setRepairing callback to verify the correct state when it's 
called
+        Assert.assertFalse(coordinator.setRepairingCalled);
+        coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, 
coordinator.getState());
+        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        Assert.assertTrue(coordinator.setRepairingCalled);
+        Assert.assertTrue(repairSubmitted.get());
+
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        ArrayList<RepairSessionResult> results = 
Lists.newArrayList(createResult(coordinator),
+                                                                    
createResult(coordinator),
+                                                                    
createResult(coordinator));
+
+        coordinator.sentMessages.clear();
+        repairFuture.set(results);
+
+        // propose messages should have been sent once all repair sessions 
completed successfully
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            RepairMessage expected = new 
FinalizePropose(coordinator.sessionID);
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        // finalize commit messages will be sent once all participants respond 
with a promize to finalize
+        coordinator.sentMessages.clear();
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        coordinator.handleFinalizePromise(PARTICIPANT1, true);
+        Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
+
+        Assert.assertFalse(coordinator.failCalled);
+        coordinator.handleFinalizePromise(PARTICIPANT2, false);
+        Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
+        Assert.assertTrue(coordinator.failCalled);
+
+        // additional success messages should be ignored
+        Assert.assertFalse(coordinator.finalizeCommitCalled);
+        coordinator.onFinalizeCommit = Assert::fail;
+        coordinator.handleFinalizePromise(PARTICIPANT3, true);
+        Assert.assertFalse(coordinator.finalizeCommitCalled);
+        Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
+
+        // failure messages should have been sent to all participants
+        for (InetAddress participant : PARTICIPANTS)
+        {
+            RepairMessage expected = new FailSession(coordinator.sessionID);
+            assertMessageSent(coordinator, participant, expected);
+        }
+
+        Assert.assertTrue(sessionResult.isDone());
+        Assert.assertTrue(hasFailures.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
new file mode 100644
index 0000000..dfa9bc5
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class CoordinatorSessionsTest extends AbstractConsistentSessionTest
+{
+    private static TableMetadata cfm;
+    private static ColumnFamilyStore cfs;
+
+    // to check CoordinatorSessions is passing the messages to the coordinator 
session correctly
+    private static class InstrumentedCoordinatorSession extends 
CoordinatorSession
+    {
+
+        public InstrumentedCoordinatorSession(Builder builder)
+        {
+            super(builder);
+        }
+
+        int prepareResponseCalls = 0;
+        InetAddress preparePeer = null;
+        boolean prepareSuccess = false;
+        public synchronized void handlePrepareResponse(InetAddress 
participant, boolean success)
+        {
+            prepareResponseCalls++;
+            preparePeer = participant;
+            prepareSuccess = success;
+        }
+
+        int finalizePromiseCalls = 0;
+        InetAddress promisePeer = null;
+        boolean promiseSuccess = false;
+        public synchronized void handleFinalizePromise(InetAddress 
participant, boolean success)
+        {
+            finalizePromiseCalls++;
+            promisePeer = participant;
+            promiseSuccess = success;
+        }
+
+        int failCalls = 0;
+        public synchronized void fail(Executor executor)
+        {
+            failCalls++;
+        }
+    }
+
+    private static class InstrumentedCoordinatorSessions extends 
CoordinatorSessions
+    {
+        protected CoordinatorSession buildSession(CoordinatorSession.Builder 
builder)
+        {
+            return new InstrumentedCoordinatorSession(builder);
+        }
+
+        public InstrumentedCoordinatorSession getSession(UUID sessionId)
+        {
+            return (InstrumentedCoordinatorSession) 
super.getSession(sessionId);
+        }
+
+        public InstrumentedCoordinatorSession registerSession(UUID sessionId, 
Set<InetAddress> peers)
+        {
+            return (InstrumentedCoordinatorSession) 
super.registerSession(sessionId, peers);
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+        cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, 
v INT)", "coordinatorsessiontest").build();
+        SchemaLoader.createKeyspace("coordinatorsessiontest", 
KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    private static UUID registerSession()
+    {
+        return registerSession(cfs);
+    }
+
+    @Test
+    public void registerSessionTest()
+    {
+        CoordinatorSessions sessions = new CoordinatorSessions();
+        UUID sessionID = registerSession();
+        CoordinatorSession session = sessions.registerSession(sessionID, 
PARTICIPANTS);
+
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
session.getState());
+        Assert.assertEquals(sessionID, session.sessionID);
+        Assert.assertEquals(COORDINATOR, session.coordinator);
+        Assert.assertEquals(Sets.newHashSet(cfm.id), session.tableIds);
+
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        Assert.assertEquals(prs.repairedAt, session.repairedAt);
+        Assert.assertEquals(prs.getRanges(), session.ranges);
+        Assert.assertEquals(PARTICIPANTS, session.participants);
+
+        Assert.assertSame(session, sessions.getSession(sessionID));
+    }
+
+    @Test
+    public void handlePrepareResponse()
+    {
+        InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
+        UUID sessionID = registerSession();
+
+        InstrumentedCoordinatorSession session = 
sessions.registerSession(sessionID, PARTICIPANTS);
+        Assert.assertEquals(0, session.prepareResponseCalls);
+
+        sessions.handlePrepareResponse(new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, true));
+        Assert.assertEquals(1, session.prepareResponseCalls);
+        Assert.assertEquals(PARTICIPANT1, session.preparePeer);
+        Assert.assertEquals(true, session.prepareSuccess);
+    }
+
+    @Test
+    public void handlePrepareResponseNoSession()
+    {
+        InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
+        UUID fakeID = UUIDGen.getTimeUUID();
+
+        sessions.handlePrepareResponse(new PrepareConsistentResponse(fakeID, 
PARTICIPANT1, true));
+        Assert.assertNull(sessions.getSession(fakeID));
+    }
+
+    @Test
+    public void handlePromiseResponse()
+    {
+        InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
+        UUID sessionID = registerSession();
+
+        InstrumentedCoordinatorSession session = 
sessions.registerSession(sessionID, PARTICIPANTS);
+        Assert.assertEquals(0, session.finalizePromiseCalls);
+
+        sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID, 
PARTICIPANT1, true));
+        Assert.assertEquals(1, session.finalizePromiseCalls);
+        Assert.assertEquals(PARTICIPANT1, session.promisePeer);
+        Assert.assertEquals(true, session.promiseSuccess);
+    }
+
+    @Test
+    public void handlePromiseResponseNoSession()
+    {
+        InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
+        UUID fakeID = UUIDGen.getTimeUUID();
+
+        sessions.handleFinalizePromiseMessage(new FinalizePromise(fakeID, 
PARTICIPANT1, true));
+        Assert.assertNull(sessions.getSession(fakeID));
+    }
+
+    @Test
+    public void handleFailureMessage()
+    {
+        InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
+        UUID sessionID = registerSession();
+
+        InstrumentedCoordinatorSession session = 
sessions.registerSession(sessionID, PARTICIPANTS);
+        Assert.assertEquals(0, session.failCalls);
+
+        sessions.handleFailSessionMessage(new FailSession(sessionID));
+        Assert.assertEquals(1, session.failCalls);
+    }
+
+    @Test
+    public void handleFailureMessageNoSession()
+    {
+        InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
+        UUID fakeID = UUIDGen.getTimeUUID();
+
+        sessions.handleFailSessionMessage(new FailSession(fakeID));
+        Assert.assertNull(sessions.getSession(fakeID));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
new file mode 100644
index 0000000..6808efe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.service.ActiveRepairService;
+
+/**
+ * makes package private hacks available to compaction tests
+ */
+public class LocalSessionAccessor
+{
+    private static final ActiveRepairService ARS = 
ActiveRepairService.instance;
+
+    public static void startup()
+    {
+        ARS.consistent.local.start();
+    }
+
+    public static void prepareUnsafe(UUID sessionID, InetAddress coordinator, 
Set<InetAddress> peers)
+    {
+        ActiveRepairService.ParentRepairSession prs = 
ARS.getParentRepairSession(sessionID);
+        assert prs != null;
+        LocalSession session = 
ARS.consistent.local.createSessionUnsafe(sessionID, prs, peers);
+        ARS.consistent.local.putSessionUnsafe(session);
+    }
+
+    public static void finalizeUnsafe(UUID sessionID)
+    {
+        LocalSession session = ARS.consistent.local.getSession(sessionID);
+        assert session != null;
+        session.setState(ConsistentSession.State.FINALIZED);
+        ARS.consistent.local.save(session);
+    }
+
+    public static void failUnsafe(UUID sessionID)
+    {
+        LocalSession session = ARS.consistent.local.getSession(sessionID);
+        assert session != null;
+        session.setState(ConsistentSession.State.FAILED);
+        ARS.consistent.local.save(session);
+    }
+}

Reply via email to