Repository: cassandra Updated Branches: refs/heads/trunk 293908060 -> d40ac784d
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/long/org/apache/cassandra/cql3/GcCompactionBench.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/GcCompactionBench.java b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java new file mode 100644 index 0000000..ca39b55 --- /dev/null +++ b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import com.google.common.collect.Iterables; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; +import org.apache.cassandra.utils.FBUtilities; + +public class GcCompactionBench extends CQLTester +{ + private static final String SIZE_TIERED_STRATEGY = "SizeTieredCompactionStrategy', 'min_sstable_size' : '0"; + private static final String LEVELED_STRATEGY = "LeveledCompactionStrategy', 'sstable_size_in_mb' : '16"; + + private static final int DEL_SECTIONS = 1000; + private static final int FLUSH_FREQ = 10000; + private static final int RANGE_FREQUENCY_INV = 16; + static final int COUNT = 90000; + static final int ITERS = 9; + + static final int KEY_RANGE = 10; + static final int CLUSTERING_RANGE = 210000; + + static final int EXTRA_SIZE = 1025; + + // The name of this method is important! + // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we + // are effectively overriding it. + @BeforeClass + public static void setUpClass() // overrides CQLTester.setUpClass() + { + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(100); + CQLTester.setUpClass(); + } + + String hashQuery; + + @Before + public void before() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY(key, column)" + + ")" + ); + + String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int", + " CREATE FUNCTION %s (state int, val int)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val : state;'")).name; + String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text", + " CREATE FUNCTION %s (state int, val text)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name; + + String hashInt = createAggregate(KEYSPACE, "int", + " CREATE AGGREGATE %s (int)" + + " SFUNC " + hashIFunc + + " STYPE int" + + " INITCOND 1"); + String hashText = createAggregate(KEYSPACE, "text", + " CREATE AGGREGATE %s (text)" + + " SFUNC " + hashTFunc + + " STYPE int" + + " INITCOND 1"); + + hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s", + hashInt, hashInt, hashInt, hashText); + } + AtomicLong id = new AtomicLong(); + long compactionTimeNanos = 0; + + void pushData(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('.'); + int key = rand.nextInt(KEY_RANGE); + int column = rand.nextInt(CLUSTERING_RANGE); + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand)); + maybeCompact(ii); + } + } + + private String genExtra(Random rand) + { + StringBuilder builder = new StringBuilder(EXTRA_SIZE); + for (int i = 0; i < EXTRA_SIZE; ++i) + builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1))); + return builder.toString(); + } + + void deleteData(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + int key; + UntypedResultSet res; + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('-'); + if (rand.nextInt(RANGE_FREQUENCY_INV) != 1) + { + do + { + key = rand.nextInt(KEY_RANGE); + long cid = rand.nextInt(DEL_SECTIONS); + int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS); + int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS); + res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend); + } while (res.size() == 0); + UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size())); + int clustering = r.getInt("column"); + execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering); + } + else + { + key = rand.nextInt(KEY_RANGE); + long cid = rand.nextInt(DEL_SECTIONS); + int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS); + int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS); + res = execute("DELETE FROM %s WHERE key = ? AND column >= ? AND column < ?", key, cstart, cend); + } + maybeCompact(ii); + } + } + + private void maybeCompact(long ii) + { + if (ii % FLUSH_FREQ == 0) + { + System.out.print("F"); + flush(); + if (ii % (FLUSH_FREQ * 10) == 0) + { + System.out.println("C"); + long startTime = System.nanoTime(); + getCurrentColumnFamilyStore().enableAutoCompaction(true); + long endTime = System.nanoTime(); + compactionTimeNanos += endTime - startTime; + getCurrentColumnFamilyStore().disableAutoCompaction(); + } + } + } + + public void testGcCompaction(TombstoneOption tombstoneOption, TombstoneOption backgroundTombstoneOption, String compactionClass) throws Throwable + { + id.set(0); + compactionTimeNanos = 0; + alterTable("ALTER TABLE %s WITH compaction = { 'class' : '" + compactionClass + "', 'provide_overlapping_tombstones' : '" + backgroundTombstoneOption + "' };"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + + long onStartTime = System.currentTimeMillis(); + ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List<Future<?>> tasks = new ArrayList<>(); + for (int ti = 0; ti < 1; ++ti) + { + Random rand = new Random(ti); + tasks.add(es.submit(() -> + { + for (int i = 0; i < ITERS; ++i) + try + { + pushData(rand, COUNT); + deleteData(rand, COUNT / 3); + } + catch (Throwable e) + { + throw new AssertionError(e); + } + })); + } + for (Future<?> task : tasks) + task.get(); + + flush(); + long onEndTime = System.currentTimeMillis(); + int startRowCount = countRows(cfs); + int startTombCount = countTombstoneMarkers(cfs); + int startRowDeletions = countRowDeletions(cfs); + int startTableCount = cfs.getLiveSSTables().size(); + long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + System.out.println(); + + String hashesBefore = getHashes(); + + long startTime = System.currentTimeMillis(); + CompactionManager.instance.performGarbageCollection(cfs, tombstoneOption, 0); + long endTime = System.currentTimeMillis(); + + int endRowCount = countRows(cfs); + int endTombCount = countTombstoneMarkers(cfs); + int endRowDeletions = countRowDeletions(cfs); + int endTableCount = cfs.getLiveSSTables().size(); + long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + + System.out.println(cfs.getCompactionParametersJson()); + System.out.println(String.format("%s compactions completed in %.3fs", + tombstoneOption.toString(), (endTime - startTime) * 1e-3)); + System.out.println(String.format("Operations completed in %.3fs, out of which %.3f for ongoing " + backgroundTombstoneOption + " background compactions", + (onEndTime - onStartTime) * 1e-3, compactionTimeNanos * 1e-9)); + System.out.println(String.format("At start: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers", + startTableCount, startSize, startRowCount, startRowDeletions, startTombCount)); + System.out.println(String.format("At end: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers", + endTableCount, endSize, endRowCount, endRowDeletions, endTombCount)); + + String hashesAfter = getHashes(); + Assert.assertEquals(hashesBefore, hashesAfter); + } + + private String getHashes() throws Throwable + { + long startTime = System.currentTimeMillis(); + String hashes = Arrays.toString(getRows(execute(hashQuery))[0]); + long endTime = System.currentTimeMillis(); + System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3)); + return hashes; + } + + @Test + public void testCellAtEnd() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY); + } + + @Test + public void testRowAtEnd() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY); + } + + @Test + public void testCellThroughout() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, LEVELED_STRATEGY); + } + + @Test + public void testRowThroughout() throws Throwable + { + testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, LEVELED_STRATEGY); + } + + @Test + public void testCopyCompaction() throws Throwable + { + testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, LEVELED_STRATEGY); + } + + @Test + public void testCellAtEndSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, SIZE_TIERED_STRATEGY); + } + + @Test + public void testRowAtEndSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.ROW, TombstoneOption.NONE, SIZE_TIERED_STRATEGY); + } + + @Test + public void testCellThroughoutSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, SIZE_TIERED_STRATEGY); + } + + @Test + public void testRowThroughoutSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, SIZE_TIERED_STRATEGY); + } + + @Test + public void testCopyCompactionSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, SIZE_TIERED_STRATEGY); + } + + int countTombstoneMarkers(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRangeTombstoneMarker()); + } + + int countRowDeletions(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive()); + } + + int countRows(ColumnFamilyStore cfs) + { + int nowInSec = FBUtilities.nowInSeconds(); + return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec)); + } + + private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate) + { + int count = 0; + for (SSTableReader reader : cfs.getLiveSSTables()) + count += count(reader, predicate); + return count; + } + + int count(SSTableReader reader, Predicate<Unfiltered> predicate) + { + int instances = 0; + try (ISSTableScanner partitions = reader.getScanner()) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator iter = partitions.next()) + { + while (iter.hasNext()) + { + Unfiltered atom = iter.next(); + if (predicate.test(atom)) + ++instances; + } + } + } + } + return instances; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java new file mode 100644 index 0000000..6fed033 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Function; + +import com.google.common.collect.Iterables; +import org.junit.Test; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; + +public class GcCompactionTest extends CQLTester +{ + static final int KEY_COUNT = 10; + static final int CLUSTERING_COUNT = 20; + + @Test + public void testGcCompactionPartitions() throws Throwable + { + runCompactionTest("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY((key, column), data)" + + ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row' };" + ); + + } + + @Test + public void testGcCompactionRows() throws Throwable + { + runCompactionTest("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY(key, column)" + + ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row' };" + ); + + } + + @Test + public void testGcCompactionRanges() throws Throwable + { + + runCompactionTest("CREATE TABLE %s(" + + " key int," + + " column int," + + " col2 int," + + " data int," + + " extra text," + + " PRIMARY KEY(key, column, data)" + + ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row' };" + ); + } + + private void runCompactionTest(String tableDef) throws Throwable + { + createTable(tableDef); + + for (int i = 0; i < KEY_COUNT; ++i) + for (int j = 0; j < CLUSTERING_COUNT; ++j) + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j); + + Set<SSTableReader> readers = new HashSet<>(); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + flush(); + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader table0 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table0)); + int rowCount = countRows(table0); + + deleteWithSomeInserts(3, 5, 10); + flush(); + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table1 = getNewTable(readers); + assertTrue(countRows(table1) > 0); + assertTrue(countTombstoneMarkers(table1) > 0); + + deleteWithSomeInserts(5, 6, 0); + flush(); + assertEquals(3, cfs.getLiveSSTables().size()); + SSTableReader table2 = getNewTable(readers); + assertEquals(0, countRows(table2)); + assertTrue(countTombstoneMarkers(table2) > 0); + + CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename()); + + assertEquals(3, cfs.getLiveSSTables().size()); + SSTableReader table3 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table3)); + assertTrue(rowCount > countRows(table3)); + } + + @Test + public void testGcCompactionCells() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY(key)" + + ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell' };" + ); + + for (int i = 0; i < KEY_COUNT; ++i) + for (int j = 0; j < CLUSTERING_COUNT; ++j) + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j); + + Set<SSTableReader> readers = new HashSet<>(); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + flush(); + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader table0 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table0)); + int cellCount = countCells(table0); + + deleteWithSomeInserts(3, 0, 2); + flush(); + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table1 = getNewTable(readers); + assertTrue(countCells(table1) > 0); + assertEquals(0, countTombstoneMarkers(table0)); + + CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename()); + + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table3 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table3)); + assertTrue(cellCount > countCells(table3)); + } + + @Test + public void testGcCompactionStatic() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int static," + + " extra text," + + " PRIMARY KEY(key, column)" + + ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell' };" + ); + + for (int i = 0; i < KEY_COUNT; ++i) + for (int j = 0; j < CLUSTERING_COUNT; ++j) + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j); + + Set<SSTableReader> readers = new HashSet<>(); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + flush(); + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader table0 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table0)); + int cellCount = countStaticCells(table0); + assertEquals(KEY_COUNT, cellCount); + + execute("DELETE data FROM %s WHERE key = 0"); // delete static cell + execute("INSERT INTO %s (key, data) VALUES (1, 0)"); // overwrite static cell + flush(); + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table1 = getNewTable(readers); + assertTrue(countStaticCells(table1) > 0); + assertEquals(0, countTombstoneMarkers(table0)); + + CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename()); + + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table3 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table3)); + assertEquals(cellCount - 2, countStaticCells(table3)); + } + + @Test + public void testGcCompactionComplexColumn() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " data map<int, int>," + + " extra text," + + " PRIMARY KEY(key)" + + ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell' };" + ); + + for (int i = 0; i < KEY_COUNT; ++i) + for (int j = 0; j < CLUSTERING_COUNT; ++j) + execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, i+j, i); + + Set<SSTableReader> readers = new HashSet<>(); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + flush(); + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader table0 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table0)); + int cellCount = countComplexCells(table0); + + deleteWithSomeInsertsComplexColumn(3, 5, 8); + flush(); + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table1 = getNewTable(readers); + assertTrue(countComplexCells(table1) > 0); + assertEquals(0, countTombstoneMarkers(table0)); + + CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename()); + + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table3 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table3)); + assertEquals(cellCount - 23, countComplexCells(table3)); + } + + @Test + public void testLocalDeletionTime() throws Throwable + { + createTable("create table %s (k int, c1 int, primary key (k, c1)) with compaction = {'class': 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones':'row'}"); + execute("delete from %s where k = 1"); + Set<SSTableReader> readers = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables()); + getCurrentColumnFamilyStore().forceBlockingFlush(); + SSTableReader oldSSTable = getNewTable(readers); + Thread.sleep(2000); + execute("delete from %s where k = 1"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + SSTableReader newTable = getNewTable(readers); + + CompactionManager.instance.forceUserDefinedCompaction(oldSSTable.getFilename()); + + // Old table now doesn't contain any data and should disappear. + assertEquals(Collections.singleton(newTable), getCurrentColumnFamilyStore().getLiveSSTables()); + } + + private SSTableReader getNewTable(Set<SSTableReader> readers) + { + Set<SSTableReader> newOnes = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables()); + newOnes.removeAll(readers); + assertEquals(1, newOnes.size()); + readers.addAll(newOnes); + return Iterables.get(newOnes, 0); + } + + void deleteWithSomeInserts(int key_step, int delete_step, int readd_step) throws Throwable + { + for (int i = 0; i < KEY_COUNT; i += key_step) + { + if (delete_step > 0) + for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step) + { + execute("DELETE FROM %s WHERE key = ? AND column = ?", i, j); + } + if (readd_step > 0) + for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step) + { + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i-j, "readded " + i + ":" + j); + } + } + } + + void deleteWithSomeInsertsComplexColumn(int key_step, int delete_step, int readd_step) throws Throwable + { + for (int i = 0; i < KEY_COUNT; i += key_step) + { + if (delete_step > 0) + for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step) + { + execute("DELETE data[?] FROM %s WHERE key = ?", j, i); + } + if (readd_step > 0) + for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step) + { + execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, -(i+j), i); + } + } + } + + int countTombstoneMarkers(SSTableReader reader) + { + int nowInSec = FBUtilities.nowInSeconds(); + return count(reader, x -> x.isRangeTombstoneMarker() || x.isRow() && ((Row) x).hasDeletion(nowInSec) ? 1 : 0, x -> x.partitionLevelDeletion().isLive() ? 0 : 1); + } + + int countRows(SSTableReader reader) + { + int nowInSec = FBUtilities.nowInSeconds(); + return count(reader, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec) ? 1 : 0, x -> 0); + } + + int countCells(SSTableReader reader) + { + return count(reader, x -> x.isRow() ? Iterables.size((Row) x) : 0, x -> 0); + } + + int countStaticCells(SSTableReader reader) + { + return count(reader, x -> 0, x -> Iterables.size(x.staticRow())); + } + + int countComplexCells(SSTableReader reader) + { + return count(reader, x -> x.isRow() ? ((Row) x).stream().mapToInt(this::countComplex).sum() : 0, x -> 0); + } + + int countComplex(ColumnData c) + { + if (!(c instanceof ComplexColumnData)) + return 0; + ComplexColumnData ccd = (ComplexColumnData) c; + return ccd.cellsCount(); + } + + int count(SSTableReader reader, Function<Unfiltered, Integer> predicate, Function<UnfilteredRowIterator, Integer> partitionPredicate) + { + int instances = 0; + try (ISSTableScanner partitions = reader.getScanner()) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator iter = partitions.next()) + { + instances += partitionPredicate.apply(iter); + while (iter.hasNext()) + { + Unfiltered atom = iter.next(); + instances += predicate.apply(atom); + } + } + } + } + return instances; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java index c930b2a..ece2d1d 100644 --- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java +++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java @@ -51,7 +51,7 @@ public class SelectionColumnMappingTest extends CQLTester String functionName; @BeforeClass - public static void setUpClass() + public static void setUpClass() // overrides CQLTester.setUpClass() { DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java new file mode 100644 index 0000000..2189e15 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.collect.*; + +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.schema.KeyspaceParams; + +public class CompactionIteratorTest +{ + + private static final int NOW = 1000; + private static final int GC_BEFORE = 100; + private static final String KSNAME = "CompactionIteratorTest"; + private static final String CFNAME = "Integer1"; + + static final DecoratedKey kk = Util.dk("key"); + static final CFMetaData metadata; + private static final int RANGE = 1000; + private static final int COUNT = 100; + + Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>(); + + static { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KSNAME, + KeyspaceParams.simple(1), + metadata = SchemaLoader.standardCFMD(KSNAME, + CFNAME, + 1, + UTF8Type.instance, + Int32Type.instance, + Int32Type.instance)); + } + + // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the syntax used in these tests. + + @Test + public void testGcCompactionSupersedeLeft() + { + testCompaction(new String[] { + "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120]" + }, + 3); + } + + @Test + public void testGcCompactionSupersedeMiddle() + { + testCompaction(new String[] { + "5<=[140] 10[150] [140]<40 60[150]" + }, new String[] { + "7<=[160] 15[180] [160]<=30 40[120]" + }, + 3); + } + + @Test + public void testGcCompactionSupersedeRight() + { + testCompaction(new String[] { + "9<=[140] 10[150] [140]<40 60[150]" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120]" + }, + 3); + } + + @Test + public void testGcCompactionSwitchInSuperseded() + { + testCompaction(new String[] { + "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120]" + }, + 5); + } + + @Test + public void testGcCompactionBoundaries() + { + testCompaction(new String[] { + "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] [130]<100" + }, + 7); + } + + @Test + public void testGcCompactionMatches() + { + testCompaction(new String[] { + "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90 120<=[100] [100]<130" + }, new String[] { + "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] [110]<100 120<=[100] [100]<130" + }, + 5); + } + + @Test + public void testGcCompactionRowDeletion() + { + testCompaction(new String[] { + "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]" + }, new String[] { + "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]" + }, + "25[160] 30[170] 50[120]"); + } + + @Test + public void testGcCompactionPartitionDeletion() + { + testCompaction(new String[] { + "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]" + }, new String[] { + // Dxx| stands for partition deletion at time xx + "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]" + }, + "30[170]"); + } + + void testCompaction(String[] inputs, String[] tombstones, String expected) + { + testNonGcCompaction(inputs, tombstones); + + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(inputs, generator); + List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator); + List<Unfiltered> result = compact(inputLists, tombstoneLists); + System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds"); + generator.verifyValid(result); + verifyEquivalent(inputLists, result, tombstoneLists, generator); + List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1); + if (!expectedResult.equals(result)) + fail("Expected " + expected + ", got " + generator.str(result)); + } + + void testCompaction(String[] inputs, String[] tombstones, int expectedCount) + { + testNonGcCompaction(inputs, tombstones); + + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(inputs, generator); + List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator); + List<Unfiltered> result = compact(inputLists, tombstoneLists); + System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds"); + generator.verifyValid(result); + verifyEquivalent(inputLists, result, tombstoneLists, generator); + if (size(result) > expectedCount) + fail("Expected compaction with " + expectedCount + " elements, got " + size(result) + ": " + generator.str(result)); + } + + int testNonGcCompaction(String[] inputs, String[] tombstones) + { + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(inputs, generator); + List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator); + List<Unfiltered> result = compact(inputLists, Collections.emptyList()); + System.out.println("Non-GC compaction resulted in " + size(result) + " Unfiltereds"); + generator.verifyValid(result); + verifyEquivalent(inputLists, result, tombstoneLists, generator); + return size(result); + } + + private static int size(List<Unfiltered> data) + { + return data.stream().mapToInt(x -> x instanceof RangeTombstoneBoundaryMarker ? 2 : 1).sum(); + } + + private void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, UnfilteredRowsGenerator generator) + { + // sources + tombstoneSources must be the same as result + tombstoneSources + List<Unfiltered> expected = compact(Iterables.concat(sources, tombstoneSources), Collections.emptyList()); + List<Unfiltered> actual = compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), Collections.emptyList()); + if (!expected.equals(actual)) + { + System.out.println("Equivalence test failure between sources:"); + for (List<Unfiltered> partition : sources) + generator.dumpList(partition); + System.out.println("and compacted " + generator.str(result)); + System.out.println("with tombstone sources:"); + for (List<Unfiltered> partition : tombstoneSources) + generator.dumpList(partition); + System.out.println("expected " + generator.str(expected)); + System.out.println("got " + generator.str(actual)); + fail("Failed equivalence test."); + } + } + + private List<List<Unfiltered>> parse(String[] inputs, UnfilteredRowsGenerator generator) + { + return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x -> parse(x, generator))); + } + + private List<Unfiltered> parse(String input, UnfilteredRowsGenerator generator) + { + Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input); + if (m.lookingAt()) + { + int del = Integer.parseInt(m.group(1)); + input = input.substring(m.end()); + List<Unfiltered> list = generator.parse(input, NOW - 1); + deletionTimes.put(list, new DeletionTime(del, del)); + return list; + } + else + return generator.parse(input, NOW - 1); + } + + private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, Iterable<List<Unfiltered>> tombstoneSources) + { + List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(sources, list -> ImmutableList.of(listToIterator(list, kk)))); + Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>(); + transformedSources.put(kk, Iterables.transform(tombstoneSources, list -> listToIterator(list, kk))); + try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE); + CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION, + Lists.transform(content, x -> new Scanner(x)), + controller, NOW, null)) + { + List<Unfiltered> result = new ArrayList<>(); + assertTrue(iter.hasNext()); + try (UnfilteredRowIterator partition = iter.next()) + { + Iterators.addAll(result, partition); + } + assertFalse(iter.hasNext()); + return result; + } + } + + private UnfilteredRowIterator listToIterator(List<Unfiltered> list, DecoratedKey key) + { + return UnfilteredRowsGenerator.source(list, metadata, key, deletionTimes.getOrDefault(list, DeletionTime.LIVE)); + } + + NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, UnfilteredRowsGenerator generator, + List<DecoratedKey> keys, int pcount, int rcount) + { + NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>(); + for (int i = 0; i < pcount; ++i) + { + DecoratedKey key = keys.get(rand.nextInt(keys.size())); + map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 5, x -> NOW - 1)); + } + return map; + } + + @Test + public void testRandom() + { + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + for (int seed = 1; seed < 100; ++seed) + { + Random rand = new Random(seed); + List<List<Unfiltered>> sources = new ArrayList<>(); + for (int i = 0; i < 10; ++i) + sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15)); + int srcSz = sources.stream().mapToInt(CompactionIteratorTest::size).sum(); + List<List<Unfiltered>> tombSources = new ArrayList<>(); + for (int i = 0; i < 10; ++i) + sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15)); + List<Unfiltered> result = compact(sources, tombSources); + verifyEquivalent(sources, result, tombSources, generator); + assertTrue(size(result) < srcSz); + } + } + + class Controller extends CompactionController + { + private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources; + + public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore) + { + super(cfs, Collections.emptySet(), gcBefore); + this.tombstoneSources = tombstoneSources; + } + + @Override + public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly) + { + assert tombstoneOnly; + return tombstoneSources.get(key); + } + } + + class Scanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner + { + Iterator<UnfilteredRowIterator> iter; + + Scanner(Iterable<UnfilteredRowIterator> content) + { + iter = content.iterator(); + } + + @Override + public boolean isForThrift() + { + return false; + } + + @Override + public CFMetaData metadata() + { + return metadata; + } + + @Override + public boolean hasNext() + { + return iter.hasNext(); + } + + @Override + public UnfilteredRowIterator next() + { + return iter.next(); + } + + @Override + public long getLengthInBytes() + { + return 0; + } + + @Override + public long getCurrentPosition() + { + return 0; + } + + @Override + public String getBackingFiles() + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java index 0eeb379..3335c02 100644 --- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java @@ -96,9 +96,11 @@ public class UnfilteredRowIteratorsMergeTest @SuppressWarnings("unused") public void testTombstoneMerge(boolean reversed, boolean iterations) { + this.reversed = reversed; + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, reversed); + for (int seed = 1; seed <= 100; ++seed) { - this.reversed = reversed; if (ITEMS <= 20) System.out.println("\nSeed " + seed); @@ -112,27 +114,29 @@ public class UnfilteredRowIteratorsMergeTest if (ITEMS <= 20) System.out.println("Merging"); for (int i=0; i<ITERATORS; ++i) - sources.add(generateSource(r, timeGenerators.get(r.nextInt(timeGenerators.size())))); + sources.add(generator.generateSource(r, ITEMS, RANGE, DEL_RANGE, timeGenerators.get(r.nextInt(timeGenerators.size())))); List<Unfiltered> merged = merge(sources, iterations); if (ITEMS <= 20) System.out.println("results in"); if (ITEMS <= 20) - dumpList(merged); - verifyEquivalent(sources, merged); - verifyValid(merged); + generator.dumpList(merged); + verifyEquivalent(sources, merged, generator); + generator.verifyValid(merged); if (reversed) { Collections.reverse(merged); - this.reversed = false; - verifyValid(merged); + generator.verifyValid(merged, false); } } } private List<Unfiltered> merge(List<List<Unfiltered>> sources, boolean iterations) { - List<UnfilteredRowIterator> us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList()); + List<UnfilteredRowIterator> us = sources. + stream(). + map(l -> new UnfilteredRowsGenerator.Source(l.iterator(), metadata, partitionKey, DeletionTime.LIVE, reversed)). + collect(Collectors.toList()); List<Unfiltered> merged = new ArrayList<>(); Iterators.addAll(merged, mergeIterators(us, iterations)); return merged; @@ -285,24 +289,24 @@ public class UnfilteredRowIteratorsMergeTest } } - void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged) + void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged, UnfilteredRowsGenerator generator) { try { for (int i=0; i<RANGE; ++i) { - Clusterable c = clusteringFor(i); + Clusterable c = UnfilteredRowsGenerator.clusteringFor(i); DeletionTime dt = DeletionTime.LIVE; for (List<Unfiltered> source : sources) { dt = deletionFor(c, source, dt); } - Assert.assertEquals("Deletion time mismatch for position " + str(c), dt, deletionFor(c, merged)); + Assert.assertEquals("Deletion time mismatch for position " + i, dt, deletionFor(c, merged)); if (dt == DeletionTime.LIVE) { Optional<Unfiltered> sourceOpt = sources.stream().map(source -> rowFor(c, source)).filter(x -> x != null).findAny(); Unfiltered mergedRow = rowFor(c, merged); - Assert.assertEquals("Content mismatch for position " + str(c), str(sourceOpt.orElse(null)), str(mergedRow)); + Assert.assertEquals("Content mismatch for position " + i, clustering(sourceOpt.orElse(null)), clustering(mergedRow)); } } } @@ -310,13 +314,20 @@ public class UnfilteredRowIteratorsMergeTest { System.out.println(e); for (List<Unfiltered> list : sources) - dumpList(list); + generator.dumpList(list); System.out.println("merged"); - dumpList(merged); + generator.dumpList(merged); throw e; } } + String clustering(Clusterable curr) + { + if (curr == null) + return "null"; + return Int32Type.instance.getString(curr.clustering().get(0)); + } + private Unfiltered rowFor(Clusterable pointer, List<Unfiltered> list) { int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator); @@ -424,21 +435,23 @@ public class UnfilteredRowIteratorsMergeTest public void testForInput(String... inputs) { + reversed = false; + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, false); + List<List<Unfiltered>> sources = new ArrayList<>(); for (String input : inputs) { - List<Unfiltered> source = parse(input); - attachBoundaries(source); - dumpList(source); - verifyValid(source); + List<Unfiltered> source = generator.parse(input, DEL_RANGE); + generator.dumpList(source); + generator.verifyValid(source); sources.add(source); } List<Unfiltered> merged = merge(sources, false); System.out.println("Merge to:"); - dumpList(merged); - verifyEquivalent(sources, merged); - verifyValid(merged); + generator.dumpList(merged); + verifyEquivalent(sources, merged, generator); + generator.verifyValid(merged); System.out.println(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java new file mode 100644 index 0000000..7cdccdb --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.junit.Assert; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.rows.Unfiltered.Kind; +import org.apache.cassandra.utils.btree.BTree; + +public class UnfilteredRowsGenerator +{ + final boolean reversed; + final Comparator<Clusterable> comparator; + + public UnfilteredRowsGenerator(Comparator<Clusterable> comparator, boolean reversed) + { + this.reversed = reversed; + this.comparator = comparator; + } + + String str(Clusterable curr) + { + if (curr == null) + return "null"; + String val = Int32Type.instance.getString(curr.clustering().get(0)); + if (curr instanceof RangeTombstoneMarker) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) curr; + if (marker.isClose(reversed)) + val = "[" + marker.closeDeletionTime(reversed).markedForDeleteAt() + "]" + (marker.closeIsInclusive(reversed) ? "<=" : "<") + val; + if (marker.isOpen(reversed)) + val = val + (marker.openIsInclusive(reversed) ? "<=" : "<") + "[" + marker.openDeletionTime(reversed).markedForDeleteAt() + "]"; + } + else if (curr instanceof Row) + { + Row row = (Row) curr; + String delTime = ""; + if (!row.deletion().time().isLive()) + delTime = "D" + row.deletion().time().markedForDeleteAt(); + val = val + "[" + row.primaryKeyLivenessInfo().timestamp() + delTime + "]"; + } + return val; + } + + public void verifyValid(List<Unfiltered> list) + { + verifyValid(list, reversed); + } + + void verifyValid(List<Unfiltered> list, boolean reversed) + { + int reversedAsMultiplier = reversed ? -1 : 1; + try { + RangeTombstoneMarker prev = null; + Unfiltered prevUnfiltered = null; + for (Unfiltered unfiltered : list) + { + Assert.assertTrue("Order violation prev " + str(prevUnfiltered) + " curr " + str(unfiltered), + prevUnfiltered == null || comparator.compare(prevUnfiltered, unfiltered) * reversedAsMultiplier < 0); + prevUnfiltered = unfiltered; + + if (unfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER) + { + RangeTombstoneMarker curr = (RangeTombstoneMarker) unfiltered; + if (prev != null) + { + if (curr.isClose(reversed)) + { + Assert.assertTrue(str(unfiltered) + " follows another close marker " + str(prev), prev.isOpen(reversed)); + Assert.assertEquals("Deletion time mismatch for open " + str(prev) + " and close " + str(unfiltered), + prev.openDeletionTime(reversed), + curr.closeDeletionTime(reversed)); + } + else + Assert.assertFalse(str(curr) + " follows another open marker " + str(prev), prev.isOpen(reversed)); + } + + prev = curr; + } + } + Assert.assertFalse("Cannot end in open marker " + str(prev), prev != null && prev.isOpen(reversed)); + + } catch (AssertionError e) { + System.out.println(e); + dumpList(list); + throw e; + } + } + + public List<Unfiltered> generateSource(Random r, int items, int range, int del_range, Function<Integer, Integer> timeGenerator) + { + int[] positions = new int[items + 1]; + for (int i=0; i<items; ++i) + positions[i] = r.nextInt(range); + positions[items] = range; + Arrays.sort(positions); + + List<Unfiltered> content = new ArrayList<>(items); + int prev = -1; + for (int i=0; i<items; ++i) + { + int pos = positions[i]; + int sz = positions[i + 1] - pos; + if (sz == 0 && pos == prev) + // Filter out more than two of the same position. + continue; + if (r.nextBoolean() || pos == prev) + { + int span; + boolean includesStart; + boolean includesEnd; + if (pos > prev) + { + span = r.nextInt(sz + 1); + includesStart = span > 0 ? r.nextBoolean() : true; + includesEnd = span > 0 ? r.nextBoolean() : true; + } + else + { + span = 1 + r.nextInt(sz); + includesStart = false; + includesEnd = r.nextBoolean(); + } + int deltime = r.nextInt(del_range); + DeletionTime dt = new DeletionTime(deltime, deltime); + content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt)); + content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt)); + prev = pos + span - (includesEnd ? 0 : 1); + } + else + { + content.add(emptyRowAt(pos, timeGenerator)); + prev = pos; + } + } + + attachBoundaries(content); + if (reversed) + { + Collections.reverse(content); + } + verifyValid(content); + if (items <= 20) + dumpList(content); + return content; + } + + /** + * Constructs a list of unfiltereds with integer clustering according to the specification string. + * + * The string is a space-delimited sorted list that can contain: + * * open tombstone markers, e.g. xx<[yy] where xx is the clustering, yy is the deletion time, and "<" stands for + * non-inclusive (<= for inclusive). + * * close tombstone markers, e.g. [yy]<=xx. Adjacent close and open markers (e.g. [yy]<=xx xx<[zz]) are combined + * into boundary markers. + * * empty rows, e.g. xx or xx[yy] or xx[yyDzz] where xx is the clustering, yy is the live time and zz is deletion + * time. + * + * @param input Specification. + * @param default_liveness Liveness to use for rows if not explicitly specified. + * @return Parsed list. + */ + public List<Unfiltered> parse(String input, int default_liveness) + { + String[] split = input.split(" "); + Pattern open = Pattern.compile("(\\d+)<(=)?\\[(\\d+)\\]"); + Pattern close = Pattern.compile("\\[(\\d+)\\]<(=)?(\\d+)"); + Pattern row = Pattern.compile("(\\d+)(\\[(\\d+)(?:D(\\d+))?\\])?"); + List<Unfiltered> out = new ArrayList<>(split.length); + for (String s : split) + { + Matcher m = open.matcher(s); + if (m.matches()) + { + out.add(openMarker(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(3)), m.group(2) != null)); + continue; + } + m = close.matcher(s); + if (m.matches()) + { + out.add(closeMarker(Integer.parseInt(m.group(3)), Integer.parseInt(m.group(1)), m.group(2) != null)); + continue; + } + m = row.matcher(s); + if (m.matches()) + { + int live = m.group(3) != null ? Integer.parseInt(m.group(3)) : default_liveness; + int delTime = m.group(4) != null ? Integer.parseInt(m.group(4)) : -1; + out.add(emptyRowAt(Integer.parseInt(m.group(1)), live, delTime)); + continue; + } + Assert.fail("Can't parse " + s); + } + attachBoundaries(out); + return out; + } + + static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator) + { + final Clustering clustering = clusteringFor(pos); + final LivenessInfo live = LivenessInfo.create(timeGenerator.apply(pos), UnfilteredRowIteratorsMergeTest.nowInSec); + return BTreeRow.noCellLiveRow(clustering, live); + } + + static Row emptyRowAt(int pos, int time, int deletionTime) + { + final Clustering clustering = clusteringFor(pos); + final LivenessInfo live = LivenessInfo.create(time, UnfilteredRowIteratorsMergeTest.nowInSec); + final DeletionTime delTime = deletionTime == -1 ? DeletionTime.LIVE : new DeletionTime(deletionTime, deletionTime); + return BTreeRow.create(clustering, live, Row.Deletion.regular(delTime), BTree.empty()); + } + + static Clustering clusteringFor(int i) + { + return Clustering.make(Int32Type.instance.decompose(i)); + } + + static ClusteringBound boundFor(int pos, boolean start, boolean inclusive) + { + return ClusteringBound.create(ClusteringBound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)}); + } + + static void attachBoundaries(List<Unfiltered> content) + { + int di = 0; + RangeTombstoneMarker prev = null; + for (int si = 0; si < content.size(); ++si) + { + Unfiltered currUnfiltered = content.get(si); + RangeTombstoneMarker curr = currUnfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER ? + (RangeTombstoneMarker) currUnfiltered : + null; + if (prev != null && curr != null && prev.isClose(false) && curr.isOpen(false) && prev.clustering().invert().equals(curr.clustering())) + { + // Join. Prefer not to use merger to check its correctness. + ClusteringBound b = (ClusteringBound) prev.clustering(); + ClusteringBoundary boundary = ClusteringBoundary.create( + b.isInclusive() ? ClusteringBound.Kind.INCL_END_EXCL_START_BOUNDARY : ClusteringBound.Kind.EXCL_END_INCL_START_BOUNDARY, + b.getRawValues()); + prev = new RangeTombstoneBoundaryMarker(boundary, prev.closeDeletionTime(false), curr.openDeletionTime(false)); + currUnfiltered = prev; + --di; + } + content.set(di++, currUnfiltered); + prev = curr; + } + for (int pos = content.size() - 1; pos >= di; --pos) + content.remove(pos); + } + + static RangeTombstoneMarker openMarker(int pos, int delTime, boolean inclusive) + { + return marker(pos, delTime, true, inclusive); + } + + static RangeTombstoneMarker closeMarker(int pos, int delTime, boolean inclusive) + { + return marker(pos, delTime, false, inclusive); + } + + private static RangeTombstoneMarker marker(int pos, int delTime, boolean isStart, boolean inclusive) + { + return new RangeTombstoneBoundMarker(ClusteringBound.create(ClusteringBound.boundKind(isStart, inclusive), + new ByteBuffer[] {clusteringFor(pos).get(0)}), + new DeletionTime(delTime, delTime)); + } + + public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey) + { + return source(content, metadata, partitionKey, DeletionTime.LIVE); + } + + public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime delTime) + { + return new Source(content.iterator(), metadata, partitionKey, delTime, false); + } + + static class Source extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator + { + Iterator<Unfiltered> content; + + protected Source(Iterator<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime partitionLevelDeletion, boolean reversed) + { + super(metadata, + partitionKey, + partitionLevelDeletion, + metadata.partitionColumns(), + Rows.EMPTY_STATIC_ROW, + reversed, + EncodingStats.NO_STATS); + this.content = content; + } + + @Override + protected Unfiltered computeNext() + { + return content.hasNext() ? content.next() : endOfData(); + } + } + + public String str(List<Unfiltered> list) + { + StringBuilder builder = new StringBuilder(); + for (Unfiltered u : list) + { + builder.append(str(u)); + builder.append(' '); + } + return builder.toString(); + } + + public void dumpList(List<Unfiltered> list) + { + System.out.println(str(list)); + } +} \ No newline at end of file