http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index a096c78..46c0afd 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -107,7 +107,6 @@ public class CleanupTest SchemaLoader.compositeIndexCFMD(KEYSPACE2, CF_INDEXED2, true)); } - /* @Test public void testCleanup() throws ExecutionException, InterruptedException { @@ -116,7 +115,6 @@ public class CleanupTest Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - UnfilteredPartitionIterator iter; // insert data and verify we get it back w/ range query fillCF(cfs, "val", LOOPS); @@ -124,8 +122,7 @@ public class CleanupTest // record max timestamps of the sstables pre-cleanup List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs); - iter = Util.getRangeSlice(cfs); - assertEquals(LOOPS, Iterators.size(iter)); + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); // with one token in the ring, owned by the local node, cleanup should be a no-op CompactionManager.instance.performCleanup(cfs, 2); @@ -134,10 +131,8 @@ public class CleanupTest assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs)); // check data is still there - iter = Util.getRangeSlice(cfs); - assertEquals(LOOPS, Iterators.size(iter)); + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); } - */ @Test public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/CleanupTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CleanupTransientTest.java b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java new file mode 100644 index 0000000..9789183 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CleanupTransientTest +{ + private static final IPartitioner partitioner = RandomPartitioner.instance; + private static IPartitioner oldPartitioner; + + public static final int LOOPS = 200; + public static final String KEYSPACE1 = "CleanupTest1"; + public static final String CF_INDEXED1 = "Indexed1"; + public static final String CF_STANDARD1 = "Standard1"; + + public static final String KEYSPACE2 = "CleanupTestMultiDc"; + public static final String CF_INDEXED2 = "Indexed2"; + public static final String CF_STANDARD2 = "Standard2"; + + public static final ByteBuffer COLUMN = ByteBufferUtil.bytes("birthdate"); + public static final ByteBuffer VALUE = ByteBuffer.allocate(8); + static + { + VALUE.putLong(20101229); + VALUE.flip(); + } + + @BeforeClass + public static void setup() throws Exception + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple("2/1"), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1), + SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED1, true)); + + StorageService ss = StorageService.instance; + final int RING_SIZE = 2; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddressAndPort> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + endpointTokens.add(RandomPartitioner.MINIMUM); + endpointTokens.add(RandomPartitioner.instance.midpoint(RandomPartitioner.MINIMUM, new RandomPartitioner.BigIntegerToken(RandomPartitioner.MAXIMUM))); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + + DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch() + { + @Override + public String getRack(InetAddressAndPort endpoint) + { + return "RC1"; + } + + @Override + public String getDatacenter(InetAddressAndPort endpoint) + { + return "DC1"; + } + }); + } + + @Test + public void testCleanup() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + + + // insert data and verify we get it back w/ range query + fillCF(cfs, "val", LOOPS); + + // record max timestamps of the sstables pre-cleanup + List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs); + + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); + + // with two tokens RF=2/1 and the sstable not repaired this should do nothing + CompactionManager.instance.performCleanup(cfs, 2); + + // ensure max timestamp of the sstables are retained post-cleanup + assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs)); + + // check data is still there + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); + + //Get an exact count of how many partitions are in the fully replicated range and should + //be retained + int fullCount = 0; + RangesAtEndpoint localRanges = StorageService.instance.getLocalReplicas(keyspace.getName()).filter(Replica::isFull); + for (FilteredPartition partition : Util.getAll(Util.cmd(cfs).build())) + { + Token token = partition.partitionKey().getToken(); + for (Replica r : localRanges) + { + if (r.range().contains(token)) + fullCount++; + } + } + + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, null, false); + sstable.reloadSSTableMetadata(); + + // This should remove approximately 50% of the data, specifically whatever was transiently replicated + CompactionManager.instance.performCleanup(cfs, 2); + + // ensure max timestamp of the sstables are retained post-cleanup + assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs)); + + // check less data is there, all transient data should be gone since the table was repaired + assertEquals(fullCount, Util.getAll(Util.cmd(cfs).build()).size()); + } + + protected void fillCF(ColumnFamilyStore cfs, String colName, int rowsPerSSTable) + { + CompactionManager.instance.disableAutoCompaction(); + + for (int i = 0; i < rowsPerSSTable; i++) + { + String key = String.valueOf(i); + // create a row and update the birthdate value, test that the index query fetches the new version + new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), ByteBufferUtil.bytes(key)) + .clustering(COLUMN) + .add(colName, VALUE) + .build() + .applyUnsafe(); + } + + cfs.forceBlockingFlush(); + } + + protected List<Long> getMaxTimestampList(ColumnFamilyStore cfs) + { + List<Long> list = new LinkedList<Long>(); + for (SSTableReader sstable : cfs.getLiveSSTables()) + list.add(sstable.getMaxTimestamp()); + return list; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/ImportTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java index 66bbff3..5ceb233 100644 --- a/test/unit/org/apache/cassandra/db/ImportTest.java +++ b/test/unit/org/apache/cassandra/db/ImportTest.java @@ -174,7 +174,7 @@ public class ImportTest extends CQLTester Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); getCurrentColumnFamilyStore().clearUnsafe(); for (SSTableReader sstable : sstables) - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 111, null); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 111, null, false); File backupdir = moveToBackupDir(sstables); assertEquals(0, execute("select * from %s").size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java index e01088d..a864786 100644 --- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java +++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java @@ -308,7 +308,7 @@ public class RepairedDataTombstonesTest extends CQLTester public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, null); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, null, false); sstable.reloadSSTableMetadata(); cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java index 706b274..1ac5440 100644 --- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java @@ -70,6 +70,12 @@ public class RowUpdateBuilder this.updateBuilder.nowInSec(localDeletionTime); } + public RowUpdateBuilder timestamp(long ts) + { + updateBuilder.timestamp(ts); + return this; + } + private Row.SimpleBuilder rowBuilder() { // Normally, rowBuilder is created by the call to clustering(), but we allow skipping that call for an empty http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 36af54f..b58909b 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -615,7 +615,7 @@ public class ScrubTest { SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS); MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0); - return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, metadata, collector, header, txn), txn); + return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, false, metadata, collector, header, txn), txn); } private static class TestMultiWriter extends SimpleSSTableMultiWriter @@ -631,10 +631,10 @@ public class ScrubTest */ private static class TestWriter extends BigTableWriter { - TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, TableMetadataRef metadata, + TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, TableMetadataRef metadata, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, Collections.emptySet(), txn); + super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, Collections.emptySet(), txn); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java index 1c051f5..a14db00 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java @@ -34,6 +34,8 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.assertEquals; @@ -189,4 +191,28 @@ public class SystemKeyspaceMigrator40Test extends CQLTester } assertEquals(1, rowCount); } + + @Test + public void testMigrateAvailableRanges() throws Throwable + { + Range<Token> testRange = new Range<>(DatabaseDescriptor.getPartitioner().getRandomToken(), DatabaseDescriptor.getPartitioner().getRandomToken()); + String insert = String.format("INSERT INTO %s (" + + "keyspace_name, " + + "ranges) " + + " values ( ?, ? )", + SystemKeyspaceMigrator40.legacyAvailableRangesName); + execute(insert, + "foo", + ImmutableSet.of(SystemKeyspace.rangeToBytes(testRange))); + SystemKeyspaceMigrator40.migrate(); + + int rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.availableRangesName))) + { + rowCount++; + assertEquals("foo", row.getString("keyspace_name")); + assertEquals(ImmutableSet.of(testRange), SystemKeyspace.rawRangesToRangeSet(row.getSet("full_ranges", BytesType.instance), DatabaseDescriptor.getPartitioner())); + } + assertEquals(1, rowCount); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java index 32fa4e4..fff567b 100644 --- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java +++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java @@ -26,6 +26,7 @@ import java.util.*; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; +import org.apache.cassandra.service.reads.NeverSpeculativeRetryPolicy; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -255,6 +256,7 @@ public class TableCQLHelperTest extends CQLTester .maxIndexInterval(7) .memtableFlushPeriod(8) .speculativeRetry(AlwaysSpeculativeRetryPolicy.INSTANCE) + .speculativeWriteThreshold(NeverSpeculativeRetryPolicy.INSTANCE) .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes()))) .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance), FBUtilities.timestampMicros()); @@ -272,6 +274,7 @@ public class TableCQLHelperTest extends CQLTester "\tAND max_index_interval = 7\n" + "\tAND memtable_flush_period_in_ms = 8\n" + "\tAND speculative_retry = 'ALWAYS'\n" + + "\tAND speculative_write_threshold = 'NEVER'\n" + "\tAND comment = 'comment'\n" + "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" + "\tAND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' }\n" + http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/VerifyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java index 0632274..df2acb4 100644 --- a/test/unit/org/apache/cassandra/db/VerifyTest.java +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -421,7 +421,7 @@ public class VerifyTest // make the sstable repaired: SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, System.currentTimeMillis(), sstable.getSSTableMetadata().pendingRepair); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, System.currentTimeMillis(), sstable.getPendingRepair(), sstable.isTransient()); sstable.reloadSSTableMetadata(); // break the sstable: @@ -487,7 +487,7 @@ public class VerifyTest fillCF(cfs, 2); SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, sstable.getSSTableMetadata().pendingRepair); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, sstable.getPendingRepair(), sstable.isTransient()); sstable.reloadSSTableMetadata(); cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable)); assertTrue(sstable.isRepaired()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java index a320248..4d62894 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java @@ -109,17 +109,16 @@ public class AbstractPendingRepairTest extends AbstractRepairTest SSTableReader sstable = diff.iterator().next(); if (orphan) { - Iterables.any(csm.getUnrepaired(), s -> s.getSSTables().contains(sstable)); - csm.getUnrepaired().forEach(s -> s.removeSSTable(sstable)); + csm.getUnrepairedUnsafe().allStrategies().forEach(acs -> acs.removeSSTable(sstable)); } return sstable; } - protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) + protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) { try { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient); sstable.reloadSSTableMetadata(); } catch (IOException e) @@ -130,11 +129,11 @@ public class AbstractPendingRepairTest extends AbstractRepairTest protected static void mutateRepaired(SSTableReader sstable, long repairedAt) { - mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); + mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false); } - protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair) + protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient) { - mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); + mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 366c18e..f514ea6 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.function.Predicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -39,6 +40,8 @@ import org.junit.Test; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -50,7 +53,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; @@ -58,6 +60,7 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.UpdateBuilder; @@ -77,16 +80,21 @@ public class AntiCompactionTest { private static final String KEYSPACE1 = "AntiCompactionTest"; private static final String CF = "AntiCompactionTest"; + private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList(); + private static TableMetadata metadata; private static ColumnFamilyStore cfs; + private static InetAddressAndPort local; + @BeforeClass - public static void defineSchema() throws ConfigurationException + public static void defineSchema() throws Throwable { SchemaLoader.prepareServer(); metadata = SchemaLoader.standardCFMD(KEYSPACE1, CF).build(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata); cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id); + local = InetAddressAndPort.getByName("127.0.0.1"); } @After @@ -97,56 +105,86 @@ public class AntiCompactionTest store.truncateBlocking(); } - private void registerParentRepairSession(UUID sessionID, Collection<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException + private void registerParentRepairSession(UUID sessionID, Iterable<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException { ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getByName("10.0.0.1"), - Lists.newArrayList(cfs), ranges, + Lists.newArrayList(cfs), ImmutableSet.copyOf(ranges), pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE, repairedAt, true, PreviewKind.NONE); } - private void antiCompactOne(long repairedAt, UUID pendingRepair) throws Exception + private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans) { - assert repairedAt != UNREPAIRED_SSTABLE || pendingRepair != null; + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local); + for (Range<Token> range : full) + builder.add(new Replica(local, range, true)); - ColumnFamilyStore store = prepareColumnFamilyStore(); - Collection<SSTableReader> sstables = getUnrepairedSSTables(store); - assertEquals(store.getLiveSSTables().size(), sstables.size()); - Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); - List<Range<Token>> ranges = Arrays.asList(range); + for (Range<Token> range : trans) + builder.add(new Replica(local, range, false)); + + return builder.build(); + } + + private static Collection<Range<Token>> range(int l, int r) + { + return Collections.singleton(new Range<>(new BytesToken(Integer.toString(l).getBytes()), new BytesToken(Integer.toString(r).getBytes()))); + } - int repairedKeys = 0; + private static class SSTableStats + { + int numLiveSSTables = 0; int pendingKeys = 0; - int nonRepairedKeys = 0; + int transKeys = 0; + int unrepairedKeys = 0; + } + + private SSTableStats antiCompactRanges(ColumnFamilyStore store, RangesAtEndpoint ranges) throws IOException + { + UUID sessionID = UUID.randomUUID(); + Collection<SSTableReader> sstables = getUnrepairedSSTables(store); try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { if (txn == null) throw new IllegalStateException(); - UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair; - registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair); - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession); + registerParentRepairSession(sessionID, ranges.ranges(), FBUtilities.nowInSeconds(), sessionID); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, sessionID); } - assertEquals(2, store.getLiveSSTables().size()); + SSTableStats stats = new SSTableStats(); + stats.numLiveSSTables = store.getLiveSSTables().size(); + + Predicate<Token> fullContains = t -> Iterables.any(ranges.fullRanges(), r -> r.contains(t)); + Predicate<Token> transContains = t -> Iterables.any(ranges.transientRanges(), r -> r.contains(t)); for (SSTableReader sstable : store.getLiveSSTables()) { + assertFalse(sstable.isRepaired()); + assertEquals(sstable.isPendingRepair() ? sessionID : NO_PENDING_REPAIR, sstable.getPendingRepair()); try (ISSTableScanner scanner = sstable.getScanner()) { while (scanner.hasNext()) { UnfilteredRowIterator row = scanner.next(); - if (sstable.isRepaired() || sstable.isPendingRepair()) + Token token = row.partitionKey().getToken(); + if (sstable.isPendingRepair() && !sstable.isTransient()) { - assertTrue(range.contains(row.partitionKey().getToken())); - repairedKeys += sstable.isRepaired() ? 1 : 0; - pendingKeys += sstable.isPendingRepair() ? 1 : 0; + assertTrue(fullContains.test(token)); + assertFalse(transContains.test(token)); + stats.pendingKeys++; + } + else if (sstable.isPendingRepair() && sstable.isTransient()) + { + + assertTrue(transContains.test(token)); + assertFalse(fullContains.test(token)); + stats.transKeys++; } else { - assertFalse(range.contains(row.partitionKey().getToken())); - nonRepairedKeys++; + assertFalse(fullContains.test(token)); + assertFalse(transContains.test(token)); + stats.unrepairedKeys++; } } } @@ -157,21 +195,40 @@ public class AntiCompactionTest assertEquals(1, sstable.selfRef().globalCount()); } assertEquals(0, store.getTracker().getCompacting().size()); - assertEquals(repairedKeys, repairedAt != UNREPAIRED_SSTABLE ? 4 : 0); - assertEquals(pendingKeys, pendingRepair != NO_PENDING_REPAIR ? 4 : 0); - assertEquals(nonRepairedKeys, 6); + return stats; } @Test - public void antiCompactOneRepairedAt() throws Exception + public void antiCompactOneFull() throws Exception { - antiCompactOne(1000, NO_PENDING_REPAIR); + ColumnFamilyStore store = prepareColumnFamilyStore(); + SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES)); + assertEquals(2, stats.numLiveSSTables); + assertEquals(stats.pendingKeys, 4); + assertEquals(stats.transKeys, 0); + assertEquals(stats.unrepairedKeys, 6); + } + + @Test + public void antiCompactOneMixed() throws Exception + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8))); + assertEquals(3, stats.numLiveSSTables); + assertEquals(stats.pendingKeys, 4); + assertEquals(stats.transKeys, 4); + assertEquals(stats.unrepairedKeys, 2); } @Test - public void antiCompactOnePendingRepair() throws Exception + public void antiCompactOneTransOnly() throws Exception { - antiCompactOne(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + ColumnFamilyStore store = prepareColumnFamilyStore(); + SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4))); + assertEquals(2, stats.numLiveSSTables); + assertEquals(stats.pendingKeys, 0); + assertEquals(stats.transKeys, 4); + assertEquals(stats.unrepairedKeys, 6); } @Test @@ -190,7 +247,7 @@ public class AntiCompactionTest try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(cfs, ranges, refs, txn, 12345, NO_PENDING_REPAIR, parentRepairSession); + CompactionManager.instance.performAnticompaction(cfs, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession); } long sum = 0; long rows = 0; @@ -208,7 +265,7 @@ public class AntiCompactionTest File dir = cfs.getDirectories().getDirectoryForNewSSTables(); Descriptor desc = cfs.newSSTableDescriptor(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) { for (int i = 0; i < count; i++) { @@ -240,7 +297,7 @@ public class AntiCompactionTest } @Test - public void antiCompactTen() throws InterruptedException, IOException + public void antiCompactTenFull() throws InterruptedException, IOException { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); @@ -250,56 +307,59 @@ public class AntiCompactionTest { generateSStable(store,Integer.toString(table)); } - Collection<SSTableReader> sstables = getUnrepairedSSTables(store); - assertEquals(store.getLiveSSTables().size(), sstables.size()); + SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES)); + /* + Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time + so there will be no net change in the number of sstables + */ + assertEquals(10, stats.numLiveSSTables); + assertEquals(stats.pendingKeys, 40); + assertEquals(stats.transKeys, 0); + assertEquals(stats.unrepairedKeys, 60); + } - Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); - List<Range<Token>> ranges = Arrays.asList(range); + @Test + public void antiCompactTenTrans() throws InterruptedException, IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); - long repairedAt = 1000; - UUID parentRepairSession = UUID.randomUUID(); - registerParentRepairSession(parentRepairSession, ranges, repairedAt, null); - try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); - Refs<SSTableReader> refs = Refs.ref(sstables)) + for (int table = 0; table < 10; table++) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, NO_PENDING_REPAIR, parentRepairSession); + generateSStable(store,Integer.toString(table)); } + SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4))); /* Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time so there will be no net change in the number of sstables */ - assertEquals(10, store.getLiveSSTables().size()); - int repairedKeys = 0; - int nonRepairedKeys = 0; - for (SSTableReader sstable : store.getLiveSSTables()) + assertEquals(10, stats.numLiveSSTables); + assertEquals(stats.pendingKeys, 0); + assertEquals(stats.transKeys, 40); + assertEquals(stats.unrepairedKeys, 60); + } + + @Test + public void antiCompactTenMixed() throws InterruptedException, IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) { - try (ISSTableScanner scanner = sstable.getScanner()) - { - while (scanner.hasNext()) - { - try (UnfilteredRowIterator row = scanner.next()) - { - if (sstable.isRepaired()) - { - assertTrue(range.contains(row.partitionKey().getToken())); - assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt); - repairedKeys++; - } - else - { - assertFalse(range.contains(row.partitionKey().getToken())); - assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); - nonRepairedKeys++; - } - } - } - } + generateSStable(store,Integer.toString(table)); } - assertEquals(repairedKeys, 40); - assertEquals(nonRepairedKeys, 60); + SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8))); + assertEquals(15, stats.numLiveSSTables); + assertEquals(stats.pendingKeys, 40); + assertEquals(stats.transKeys, 40); + assertEquals(stats.unrepairedKeys, 20); } - private void shouldMutate(long repairedAt, UUID pendingRepair) throws InterruptedException, IOException + @Test + public void shouldMutatePendingRepair() throws InterruptedException, IOException { ColumnFamilyStore store = prepareColumnFamilyStore(); Collection<SSTableReader> sstables = getUnrepairedSSTables(store); @@ -307,35 +367,23 @@ public class AntiCompactionTest // the sstables start at "0".getBytes() = 48, we need to include that first token, with "/".getBytes() = 47 Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("9999".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair; - registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair); + UUID pendingRepair = UUID.randomUUID(); + registerParentRepairSession(pendingRepair, ranges, UNREPAIRED_SSTABLE, pendingRepair); try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, pendingRepair); } assertThat(store.getLiveSSTables().size(), is(1)); - assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(repairedAt != UNREPAIRED_SSTABLE)); - assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(pendingRepair != NO_PENDING_REPAIR)); + assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false)); + assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(true)); assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1)); assertThat(store.getTracker().getCompacting().size(), is(0)); } @Test - public void shouldMutateRepairedAt() throws InterruptedException, IOException - { - shouldMutate(1, NO_PENDING_REPAIR); - } - - @Test - public void shouldMutatePendingRepair() throws InterruptedException, IOException - { - shouldMutate(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); - } - - @Test public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException { Keyspace keyspace = Keyspace.open(KEYSPACE1); @@ -358,7 +406,7 @@ public class AntiCompactionTest try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession); } catch (IllegalStateException e) { @@ -428,7 +476,7 @@ public class AntiCompactionTest Assert.assertFalse(refs.isEmpty()); try { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, missingRepairSession, missingRepairSession); + CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession); Assert.fail("expected RuntimeException"); } catch (RuntimeException e) @@ -484,8 +532,7 @@ public class AntiCompactionTest Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting and should not be included - Iterator<SSTableReader> sstableIterator = sstables.iterator(); - CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); + CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES)); } @Test(expected = IllegalStateException.class) @@ -500,8 +547,7 @@ public class AntiCompactionTest Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, throw - Iterator<SSTableReader> sstableIterator = sstables.iterator(); - CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); + CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java index c7f1ae8..267c2e4 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.compaction; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -42,29 +41,34 @@ import org.apache.cassandra.utils.FBUtilities; public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingRepairTest { - private static boolean strategiesContain(Collection<AbstractCompactionStrategy> strategies, SSTableReader sstable) + private boolean transientContains(SSTableReader sstable) { - return Iterables.any(strategies, strategy -> strategy.getSSTables().contains(sstable)); - } - - private boolean pendingContains(UUID id, SSTableReader sstable) - { - return Iterables.any(csm.getPendingRepairManagers(), p -> p.get(id) != null && p.get(id).getSSTables().contains(sstable)); + return csm.getTransientRepairsUnsafe().containsSSTable(sstable); } private boolean pendingContains(SSTableReader sstable) { - return Iterables.any(csm.getPendingRepairManagers(), p -> strategiesContain(p.getStrategies(), sstable)); + return csm.getPendingRepairsUnsafe().containsSSTable(sstable); } private boolean repairedContains(SSTableReader sstable) { - return strategiesContain(csm.getRepaired(), sstable); + return csm.getRepairedUnsafe().containsSSTable(sstable); } private boolean unrepairedContains(SSTableReader sstable) { - return strategiesContain(csm.getUnrepaired(), sstable); + return csm.getUnrepairedUnsafe().containsSSTable(sstable); + } + + private boolean hasPendingStrategiesFor(UUID sessionID) + { + return !Iterables.isEmpty(csm.getPendingRepairsUnsafe().getStrategiesFor(sessionID)); + } + + private boolean hasTransientStrategiesFor(UUID sessionID) + { + return !Iterables.isEmpty(csm.getTransientRepairsUnsafe().getStrategiesFor(sessionID)); } /** @@ -75,23 +79,25 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR { UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); - Assert.assertTrue(csm.pendingRepairs().isEmpty()); + Assert.assertTrue(Iterables.isEmpty(csm.getPendingRepairsUnsafe().allStrategies())); SSTableReader sstable = makeSSTable(true); Assert.assertFalse(sstable.isRepaired()); Assert.assertFalse(sstable.isPendingRepair()); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); Assert.assertFalse(sstable.isRepaired()); Assert.assertTrue(sstable.isPendingRepair()); - csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); // add the sstable csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); Assert.assertFalse(repairedContains(sstable)); Assert.assertFalse(unrepairedContains(sstable)); - csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); - Assert.assertTrue(pendingContains(repairID, sstable)); + Assert.assertTrue(pendingContains(sstable)); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); } @Test @@ -101,16 +107,17 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable1 = makeSSTable(true); - mutateRepaired(sstable1, repairID); + mutateRepaired(sstable1, repairID, false); SSTableReader sstable2 = makeSSTable(true); - mutateRepaired(sstable2, repairID); + mutateRepaired(sstable2, repairID, false); Assert.assertFalse(repairedContains(sstable1)); Assert.assertFalse(unrepairedContains(sstable1)); Assert.assertFalse(repairedContains(sstable2)); Assert.assertFalse(unrepairedContains(sstable2)); - csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); // add only SSTableListChangedNotification notification; @@ -119,13 +126,14 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR OperationType.COMPACTION); csm.handleNotification(notification, cfs.getTracker()); - csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); Assert.assertFalse(repairedContains(sstable1)); Assert.assertFalse(unrepairedContains(sstable1)); - Assert.assertTrue(pendingContains(repairID, sstable1)); + Assert.assertTrue(pendingContains(sstable1)); Assert.assertFalse(repairedContains(sstable2)); Assert.assertFalse(unrepairedContains(sstable2)); - Assert.assertFalse(pendingContains(repairID, sstable2)); + Assert.assertFalse(pendingContains(sstable2)); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); // remove and add notification = new SSTableListChangedNotification(Collections.singleton(sstable2), @@ -135,10 +143,10 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR Assert.assertFalse(repairedContains(sstable1)); Assert.assertFalse(unrepairedContains(sstable1)); - Assert.assertFalse(pendingContains(repairID, sstable1)); + Assert.assertFalse(pendingContains(sstable1)); Assert.assertFalse(repairedContains(sstable2)); Assert.assertFalse(unrepairedContains(sstable2)); - Assert.assertTrue(pendingContains(repairID, sstable2)); + Assert.assertTrue(pendingContains(sstable2)); } @Test @@ -151,18 +159,20 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR SSTableReader sstable = makeSSTable(false); Assert.assertTrue(unrepairedContains(sstable)); Assert.assertFalse(repairedContains(sstable)); - csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); SSTableRepairStatusChanged notification; // change to pending repaired - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); notification = new SSTableRepairStatusChanged(Collections.singleton(sstable)); csm.handleNotification(notification, cfs.getTracker()); Assert.assertFalse(unrepairedContains(sstable)); Assert.assertFalse(repairedContains(sstable)); - csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); - Assert.assertTrue(pendingContains(repairID, sstable)); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(pendingContains(sstable)); // change to repaired mutateRepaired(sstable, System.currentTimeMillis()); @@ -170,7 +180,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR csm.handleNotification(notification, cfs.getTracker()); Assert.assertFalse(unrepairedContains(sstable)); Assert.assertTrue(repairedContains(sstable)); - Assert.assertFalse(pendingContains(repairID, sstable)); + Assert.assertFalse(pendingContains(sstable)); } @Test @@ -180,14 +190,14 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); - Assert.assertTrue(pendingContains(repairID, sstable)); + Assert.assertTrue(pendingContains(sstable)); // delete sstable SSTableDeletingNotification notification = new SSTableDeletingNotification(sstable); csm.handleNotification(notification, cfs.getTracker()); - Assert.assertFalse(pendingContains(repairID, sstable)); + Assert.assertFalse(pendingContains(sstable)); Assert.assertFalse(unrepairedContains(sstable)); Assert.assertFalse(repairedContains(sstable)); } @@ -209,7 +219,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR Assert.assertTrue(strategies.get(2).isEmpty()); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); strategies = csm.getStrategies(); @@ -227,11 +237,12 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); LocalSessionAccessor.finalizeUnsafe(repairID); - csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); - Assert.assertNotNull(pendingContains(repairID, sstable)); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(pendingContains(sstable)); Assert.assertTrue(sstable.isPendingRepair()); Assert.assertFalse(sstable.isRepaired()); @@ -245,7 +256,9 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR Assert.assertTrue(repairedContains(sstable)); Assert.assertFalse(unrepairedContains(sstable)); - csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + Assert.assertFalse(pendingContains(sstable)); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); // sstable should have pendingRepair cleared, and repairedAt set correctly long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt; @@ -264,12 +277,13 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); LocalSessionAccessor.failUnsafe(repairID); - csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); - Assert.assertNotNull(pendingContains(repairID, sstable)); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(pendingContains(sstable)); Assert.assertTrue(sstable.isPendingRepair()); Assert.assertFalse(sstable.isRepaired()); @@ -283,11 +297,78 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR Assert.assertFalse(repairedContains(sstable)); Assert.assertTrue(unrepairedContains(sstable)); - csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); // sstable should have pendingRepair cleared, and repairedAt set correctly Assert.assertFalse(sstable.isPendingRepair()); Assert.assertFalse(sstable.isRepaired()); Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); } + + @Test + public void finalizedSessionTransientCleanup() + { + Assert.assertTrue(cfs.getLiveSSTables().isEmpty()); + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, true); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertTrue(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(transientContains(sstable)); + Assert.assertFalse(pendingContains(sstable)); + Assert.assertFalse(repairedContains(sstable)); + Assert.assertFalse(unrepairedContains(sstable)); + + cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task + AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + Assert.assertNotNull(compactionTask); + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + + // run the compaction + compactionTask.execute(null); + + Assert.assertTrue(cfs.getLiveSSTables().isEmpty()); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + } + + @Test + public void failedSessionTransientCleanup() + { + Assert.assertTrue(cfs.getLiveSSTables().isEmpty()); + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, true); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); + LocalSessionAccessor.failUnsafe(repairID); + + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertTrue(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(transientContains(sstable)); + Assert.assertFalse(pendingContains(sstable)); + Assert.assertFalse(repairedContains(sstable)); + Assert.assertFalse(unrepairedContains(sstable)); + + cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task + AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + Assert.assertNotNull(compactionTask); + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + + // run the compaction + compactionTask.execute(null); + + Assert.assertFalse(cfs.getLiveSSTables().isEmpty()); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + Assert.assertFalse(transientContains(sstable)); + Assert.assertFalse(pendingContains(sstable)); + Assert.assertFalse(repairedContains(sstable)); + Assert.assertTrue(unrepairedContains(sstable)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java index eeaaf5b..73e6852 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java @@ -129,12 +129,12 @@ public class CompactionStrategyManagerTest if (i % 3 == 0) { //make 1 third of sstables repaired - cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null); + cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null, false); } else if (i % 3 == 1) { //make 1 third of sstables pending repair - cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID()); + cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID(), false); } previousSSTables = currentSSTables; } @@ -272,19 +272,19 @@ public class CompactionStrategyManagerTest DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); } - private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, Class<? extends AbstractStrategyHolder> expectedType) + private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, boolean isTransient, Class<? extends AbstractStrategyHolder> expectedType) { ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); CompactionStrategyManager csm = cfs.getCompactionStrategyManager(); - AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair); + AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair, isTransient); assertNotNull(holder); assertSame(expectedType, holder.getClass()); int matches = 0; for (AbstractStrategyHolder other : csm.getHolders()) { - if (other.managesRepairedGroup(isRepaired, isPendingRepair)) + if (other.managesRepairedGroup(isRepaired, isPendingRepair, isTransient)) { assertSame("holder assignment should be mutually exclusive", holder, other); matches++; @@ -293,13 +293,13 @@ public class CompactionStrategyManagerTest assertEquals(1, matches); } - private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair) + private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair, boolean isTransient) { ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); CompactionStrategyManager csm = cfs.getCompactionStrategyManager(); try { - csm.getHolder(isRepaired, isPendingRepair); + csm.getHolder(isRepaired, isPendingRepair, isTransient); fail("Expected IllegalArgumentException"); } catch (IllegalArgumentException e) @@ -315,10 +315,14 @@ public class CompactionStrategyManagerTest @Test public void testMutualExclusiveHolderClassification() throws Exception { - assertHolderExclusivity(false, false, CompactionStrategyHolder.class); - assertHolderExclusivity(true, false, CompactionStrategyHolder.class); - assertHolderExclusivity(false, true, PendingRepairHolder.class); - assertInvalieHolderConfig(true, true); + assertHolderExclusivity(false, false, false, CompactionStrategyHolder.class); + assertHolderExclusivity(true, false, false, CompactionStrategyHolder.class); + assertHolderExclusivity(false, true, false, PendingRepairHolder.class); + assertHolderExclusivity(false, true, true, PendingRepairHolder.class); + assertInvalieHolderConfig(true, true, false); + assertInvalieHolderConfig(true, true, true); + assertInvalieHolderConfig(false, false, true); + assertInvalieHolderConfig(true, false, true); } PartitionPosition forKey(int key) @@ -337,20 +341,23 @@ public class CompactionStrategyManagerTest ColumnFamilyStore cfs = createJBODMockCFS(numDir); Keyspace.open(cfs.keyspace.getName()).getColumnFamilyStore(cfs.name).disableAutoCompaction(); assertTrue(cfs.getLiveSSTables().isEmpty()); - List<SSTableReader> unrepaired = new ArrayList<>(); + List<SSTableReader> transientRepairs = new ArrayList<>(); List<SSTableReader> pendingRepair = new ArrayList<>(); + List<SSTableReader> unrepaired = new ArrayList<>(); List<SSTableReader> repaired = new ArrayList<>(); for (int i = 0; i < numDir; i++) { int key = 100 * i; - unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); + transientRepairs.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); pendingRepair.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); + unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); repaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); } - cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, UUID.randomUUID()); - cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null); + cfs.getCompactionStrategyManager().mutateRepaired(transientRepairs, 0, UUID.randomUUID(), true); + cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, UUID.randomUUID(), false); + cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null, false); DiskBoundaries boundaries = new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), Lists.newArrayList(forKey(100), forKey(200), forKey(300)), @@ -358,7 +365,7 @@ public class CompactionStrategyManagerTest CompactionStrategyManager csm = new CompactionStrategyManager(cfs, () -> boundaries, true); - List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat(repaired, pendingRepair, unrepaired)); + List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat( transientRepairs, pendingRepair, repaired, unrepaired)); for (int x=0; x<grouped.size(); x++) { @@ -372,7 +379,16 @@ public class CompactionStrategyManagerTest if (sstable.isRepaired()) expected = repaired.get(y); else if (sstable.isPendingRepair()) - expected = pendingRepair.get(y); + { + if (sstable.isTransient()) + { + expected = transientRepairs.get(y); + } + else + { + expected = pendingRepair.get(y); + } + } else expected = unrepaired.get(y); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java index 599fc74..5370f33 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java @@ -96,9 +96,9 @@ public class CompactionTaskTest Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); } - private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException + private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient); sstable.reloadSSTableMetadata(); } @@ -127,9 +127,9 @@ public class CompactionTaskTest SSTableReader pending1 = sstables.get(2); SSTableReader pending2 = sstables.get(3); - mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR); - mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); - mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR, false); + mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false); + mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false); LifecycleTransaction txn = null; List<SSTableReader> toCompact = new ArrayList<>(sstables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index c91d2fe..857fa32 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -411,7 +411,7 @@ public class CompactionsCQLTest extends CQLTester cfs.forceBlockingFlush(); } assertEquals(50, cfs.getLiveSSTables().size()); - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first(); AbstractCompactionTask act = lcs.getNextBackgroundTask(0); // we should be compacting all 50 sstables: assertEquals(50, act.transaction.originals().size()); @@ -445,7 +445,7 @@ public class CompactionsCQLTest extends CQLTester // mark the L1 sstable as compacting to make sure we trigger STCS in L0: LifecycleTransaction txn = cfs.getTracker().tryModify(l1sstable, OperationType.COMPACTION); - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first(); AbstractCompactionTask act = lcs.getNextBackgroundTask(0); // note that max_threshold is 60 (more than the amount of L0 sstables), but MAX_COMPACTING_L0 is 32, which means we will trigger STCS with at most max_threshold sstables assertEquals(50, act.transaction.originals().size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 9ebe326..23e88fe 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -364,7 +364,7 @@ public class LeveledCompactionStrategyTest SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0); SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0); - sstable1.descriptor.getMetadataSerializer().mutateRepaired(sstable1.descriptor, System.currentTimeMillis(), null); + sstable1.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable1.descriptor, System.currentTimeMillis(), null, false); sstable1.reloadSSTableMetadata(); assertTrue(sstable1.isRepaired()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java index 2b88c9c..d83e063 100644 --- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java @@ -45,7 +45,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); @@ -63,7 +63,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); LocalSessionAccessor.finalizeUnsafe(repairID); @@ -82,7 +82,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); LocalSessionAccessor.failUnsafe(repairID); @@ -94,7 +94,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest public void needsCleanupNoSession() { UUID fakeID = UUIDGen.getTimeUUID(); - PendingRepairManager prm = new PendingRepairManager(cfs, null); + PendingRepairManager prm = new PendingRepairManager(cfs, null, false); Assert.assertTrue(prm.canCleanup(fakeID)); } @@ -106,7 +106,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); @@ -122,7 +122,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); Assert.assertNotNull(prm.get(repairID)); @@ -140,13 +140,13 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); LocalSessionAccessor.finalizeUnsafe(repairID); @@ -184,7 +184,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); Assert.assertNotNull(prm.get(repairID)); @@ -202,7 +202,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertNotNull(prm.get(repairID)); Assert.assertNotNull(prm.get(repairID)); @@ -225,7 +225,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest PendingRepairManager prm = csm.getPendingRepairManagers().get(0); UUID repairId = registerSession(cfs, true, true); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairId); + mutateRepaired(sstable, repairId, false); prm.addSSTable(sstable); List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Collections.singleton(sstable), 100); try @@ -247,8 +247,8 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest SSTableReader sstable = makeSSTable(true); SSTableReader sstable2 = makeSSTable(true); - mutateRepaired(sstable, repairId); - mutateRepaired(sstable2, repairId2); + mutateRepaired(sstable, repairId, false); + mutateRepaired(sstable2, repairId2, false); prm.addSSTable(sstable); prm.addSSTable(sstable2); List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100); @@ -296,7 +296,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest Assert.assertFalse(prm.hasDataForSession(repairID)); SSTableReader sstable = makeSSTable(true); - mutateRepaired(sstable, repairID); + mutateRepaired(sstable, repairID, false); prm.addSSTable(sstable); Assert.assertTrue(prm.hasDataForSession(repairID)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java index 6428ab7..1292b7e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java @@ -56,7 +56,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester assertEquals(1, cfs.getLiveSSTables().size()); cfs.getLiveSSTables().forEach(s -> assertEquals(2, s.getSSTableLevel())); // make sure compaction strategy is notified: - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next(); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first(); for (int i = 0; i < lcs.manifest.getLevelCount(); i++) { if (i == 2) @@ -98,7 +98,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester cfs.forceBlockingFlush(); } // now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables: - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first(); AbstractCompactionTask act = lcs.getNextBackgroundTask(0); act.execute(null); @@ -148,7 +148,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester assertEquals(1, cfs.getLiveSSTables().size()); for (SSTableReader sst : cfs.getLiveSSTables()) assertEquals(0, sst.getSSTableMetadata().sstableLevel); - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next(); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first(); assertEquals(1, lcs.getLevelSize(0)); assertTrue(cfs.getTracker().getCompacting().isEmpty()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index 5694e86..e5ff138 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@ -1183,7 +1183,7 @@ public class LogTransactionTest extends AbstractTransactionalTest SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList()); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator) - .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header) + .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header) .get(MetadataType.STATS); SSTableReader reader = SSTableReader.internalOpen(descriptor, components, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index 5cda2ad..b7b7d4a 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -164,6 +164,7 @@ public class RealTransactionsTest extends SchemaLoader 0, 0, null, + false, 0, SerializationHeader.make(cfs.metadata(), txn.originals()), cfs.indexManager.listIndexes(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java index 76ebfd8..3b29cc5 100644 --- a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java +++ b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java @@ -119,11 +119,11 @@ public class CompactionManagerGetSSTablesForValidationTest Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator(); repaired = iter.next(); - repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null); + repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, System.currentTimeMillis(), null, false); repaired.reloadSSTableMetadata(); pendingRepair = iter.next(); - pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID); + pendingRepair.descriptor.getMetadataSerializer().mutateRepairMetadata(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, false); pendingRepair.reloadSSTableMetadata(); unrepaired = iter.next(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org