Reduce granuality of OpOrder.Group during index build 3.0+ version
Patch by Sam Tunnicliffe; reviewed by Milan Majercik and Jeremiah Jordan for CASSANDRA-12796 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36ce4e02 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36ce4e02 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36ce4e02 Branch: refs/heads/trunk Commit: 36ce4e02b429b1297d71c5c8a963623c62d9e159 Parents: 6434e88 Author: Sam Tunnicliffe <s...@beobal.com> Authored: Tue Dec 13 10:02:25 2016 +0000 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Tue Dec 13 10:18:09 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 23 ---- .../cassandra/index/SecondaryIndexBuilder.java | 5 +- .../cassandra/index/SecondaryIndexManager.java | 108 +++++++++++---- .../apache/cassandra/index/CustomIndexTest.java | 130 ++++++++++++++++++- 5 files changed, 216 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5bc30be..a65a147 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535) * Reenable HeapPool (CASSANDRA-12900) Merged from 2.2: + * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796) * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980) * Do not specify local address on outgoing connection when listen_on_broadcast_address is set (CASSANDRA-12673) * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935) http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 75aab8f..ec5102b 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -35,7 +35,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.view.ViewManager; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.index.Index; @@ -569,28 +568,6 @@ public class Keyspace return replicationStrategy; } - /** - * @param key row to index - * @param cfs ColumnFamily to index partition in - * @param indexes the indexes to submit the row to - */ - public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<Index> indexes) - { - if (logger.isTraceEnabled()) - logger.trace("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey())); - - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, - FBUtilities.nowInSeconds(), - key); - - try (OpOrder.Group writeGroup = cfs.keyspace.writeOrder.start(); - OpOrder.Group readGroup = cfs.readOrdering.start(); - UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, readGroup)) - { - cfs.indexManager.indexPartition(partition, writeGroup, indexes, cmd.nowInSec()); - } - } - public List<Future<?>> flush() { List<Future<?>> futures = new ArrayList<>(columnFamilyStores.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java index e66f0a3..c627b2d 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java @@ -17,13 +17,11 @@ */ package org.apache.cassandra.index; -import java.io.IOException; import java.util.Set; import java.util.UUID; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.OperationType; @@ -61,12 +59,13 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder { try { + int pageSize = cfs.indexManager.calculateIndexingPageSize(); while (iter.hasNext()) { if (isStopRequested()) throw new CompactionInterruptedException(getCompactionInfo()); DecoratedKey key = iter.next(); - Keyspace.indexPartition(key, cfs, indexers); + cfs.indexManager.indexPartition(key, indexers, pageSize); } } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 6dfdeee..003b624 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -47,6 +47,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -56,7 +57,9 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.service.pager.SinglePartitionPager; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; @@ -98,6 +101,9 @@ public class SecondaryIndexManager implements IndexRegistry { private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class); + // default page size (in rows) when rebuilding the index for a whole partition + public static final int DEFAULT_PAGE_SIZE = 10000; + private Map<String, Index> indexes = Maps.newConcurrentMap(); /** @@ -517,39 +523,97 @@ public class SecondaryIndexManager implements IndexRegistry /** * When building an index against existing data in sstables, add the given partition to the index */ - public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec) + public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize) { + if (logger.isTraceEnabled()) + logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey())); + if (!indexes.isEmpty()) { - DecoratedKey key = partition.partitionKey(); - Set<Index.Indexer> indexers = indexes.stream() - .map(index -> index.indexerFor(key, - partition.columns(), - nowInSec, - opGroup, - IndexTransaction.Type.UPDATE)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - indexers.forEach(Index.Indexer::begin); - - try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec)) + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, + FBUtilities.nowInSeconds(), + key); + int nowInSec = cmd.nowInSec(); + boolean readStatic = false; + + SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION); + while (!pager.isExhausted()) { - if (!filtered.staticRow().isEmpty()) - indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow())); - - while (filtered.hasNext()) + try (ReadOrderGroup readGroup = cmd.startOrderGroup(); + OpOrder.Group writeGroup = Keyspace.writeOrder.start(); + RowIterator partition = + PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup), + cmd)) { - Row row = filtered.next(); - indexers.forEach(indexer -> indexer.insertRow(row)); + Set<Index.Indexer> indexers = indexes.stream() + .map(index -> index.indexerFor(key, + partition.columns(), + nowInSec, + writeGroup, + IndexTransaction.Type.UPDATE)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + indexers.forEach(Index.Indexer::begin); + + // only process the static row once per partition + if (!readStatic && !partition.staticRow().isEmpty()) + { + indexers.forEach(indexer -> indexer.insertRow(partition.staticRow())); + readStatic = true; + } + + while (partition.hasNext()) + { + Row row = partition.next(); + indexers.forEach(indexer -> indexer.insertRow(row)); + } + + indexers.forEach(Index.Indexer::finish); } } - - indexers.forEach(Index.Indexer::finish); } } /** + * Return the page size used when indexing an entire partition + */ + public int calculateIndexingPageSize() + { + if (Boolean.getBoolean("cassandra.force_default_indexing_page_size")) + return DEFAULT_PAGE_SIZE; + + double targetPageSizeInBytes = 32 * 1024 * 1024; + double meanPartitionSize = baseCfs.getMeanPartitionSize(); + if (meanPartitionSize <= 0) + return DEFAULT_PAGE_SIZE; + + int meanCellsPerPartition = baseCfs.getMeanColumns(); + if (meanCellsPerPartition <= 0) + return DEFAULT_PAGE_SIZE; + + int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size(); + if (meanCellsPerPartition <= 0) + return DEFAULT_PAGE_SIZE; + + int meanRowsPerPartition = meanCellsPerPartition / columnsPerRow; + double meanRowSize = meanPartitionSize / meanRowsPerPartition; + + int pageSize = (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, targetPageSizeInBytes / meanRowSize)); + + logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})", + pageSize, + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + meanPartitionSize, + meanCellsPerPartition, + meanRowsPerPartition, + meanRowSize); + + return pageSize; + } + + /** * Delete all data from all indexes for this partition. * For when cleanup rips a partition out entirely. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index 6930d13..33e7182 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -39,22 +39,25 @@ import org.apache.cassandra.cql3.restrictions.IndexRestrictions; import org.apache.cassandra.cql3.restrictions.StatementRestrictions; import org.apache.cassandra.cql3.statements.IndexTarget; import org.apache.cassandra.cql3.statements.ModificationStatement; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.Util.throwAssert; import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -647,6 +650,42 @@ public class CustomIndexTest extends CQLTester assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length); } + @Test + public void indexBuildingPagesLargePartitions() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v int, PRIMARY KEY(k,c))"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + SecondaryIndexManager indexManager = cfs.indexManager; + int totalRows = SimulateConcurrentFlushingIndex.ROWS_IN_PARTITION; + // Insert a single wide partition to be indexed + for (int i = 0; i < totalRows; i++) + execute("INSERT INTO %s (k, c, v) VALUES (0, ?, ?)", i, i); + cfs.forceBlockingFlush(); + + // Create the index, which won't automatically start building + String indexName = "build_single_partition_idx"; + createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'", + indexName, SimulateConcurrentFlushingIndex.class.getName())); + SimulateConcurrentFlushingIndex index = (SimulateConcurrentFlushingIndex) indexManager.getIndexByName(indexName); + + // Index the partition with an Indexer which artificially simulates additional concurrent + // flush activity by periodically issuing barriers on the read & write op groupings + DecoratedKey targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(0)); + indexManager.indexPartition(targetKey, Collections.singleton(index), totalRows / 10); + + // When indexing is done check that: + // * The base table's read ordering at finish was > the one at the start (i.e. that + // we didn't hold a single read OpOrder.Group for the whole operation. + // * That multiple write OpOrder.Groups were used to perform the writes to the index + // * That all operations are complete, that none of the relevant OpOrder.Groups are + // marked as blocking progress and that all the barriers' ops are considered done. + assertTrue(index.readOrderingAtFinish.compareTo(index.readOrderingAtStart) > 0); + assertTrue(index.writeGroups.size() > 1); + assertFalse(index.readOrderingAtFinish.isBlocking()); + index.writeGroups.forEach(group -> assertFalse(group.isBlocking())); + index.barriers.forEach(OpOrder.Barrier::allPriorOpsAreFinished); + } + // Used for index creation above public static class BrokenCustom2I extends StubIndex { @@ -868,4 +907,89 @@ public class CustomIndexTest extends CQLTester return new HashMap<>(); } } + + public static final class SimulateConcurrentFlushingIndex extends StubIndex + { + ColumnFamilyStore baseCfs; + AtomicInteger indexedRowCount = new AtomicInteger(0); + + OpOrder.Group readOrderingAtStart = null; + OpOrder.Group readOrderingAtFinish = null; + Set<OpOrder.Group> writeGroups = new HashSet<>(); + List<OpOrder.Barrier> barriers = new ArrayList<>(); + + static final int ROWS_IN_PARTITION = 1000; + + public SimulateConcurrentFlushingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + this.baseCfs = baseCfs; + } + + // When indexing an entire partition 2 potential problems can be caused by + // whilst holding a single read & a single write OpOrder.Group. + // * By holding a write group too long, flushes are blocked + // * Holding a read group for too long prevents the memory from flushed memtables + // from being reclaimed. + // See CASSANDRA-12796 for details. + // To test that the index builder pages through a large partition, using + // finer grained OpOrder.Groups we write a "large" partition to disk, then + // kick off an index build on it, using this indexer. + // To simulate concurrent flush activity, we periodically issue barriers on + // the current read and write groups. + // When we're done indexing the partition, the test checks the states of the + // various OpOrder.Groups, which it can obtain from this index. + + public Indexer indexerFor(final DecoratedKey key, + PartitionColumns columns, + int nowInSec, + OpOrder.Group opGroup, + IndexTransaction.Type transactionType) + { + if (readOrderingAtStart == null) + readOrderingAtStart = baseCfs.readOrdering.getCurrent(); + + writeGroups.add(opGroup); + + return new Indexer() + { + public void begin() + { + // to simulate other activity on base table during indexing, issue + // barriers on the read and write orderings. This is analogous to + // what happens when other flushes are being processed during the + // indexing of a partition + OpOrder.Barrier readBarrier = baseCfs.readOrdering.newBarrier(); + readBarrier.issue(); + barriers.add(readBarrier); + OpOrder.Barrier writeBarrier = Keyspace.writeOrder.newBarrier(); + writeBarrier.issue(); + barriers.add(writeBarrier); + } + + public void insertRow(Row row) + { + indexedRowCount.incrementAndGet(); + } + + public void finish() + { + // we've indexed all rows in the target partition, + // grab the read OpOrder.Group for the base CFS so + // we can compare it with the starting group + if (indexedRowCount.get() < ROWS_IN_PARTITION) + readOrderingAtFinish = baseCfs.readOrdering.getCurrent(); + } + + public void partitionDelete(DeletionTime deletionTime) { } + + public void rangeTombstone(RangeTombstone tombstone) { } + + public void updateRow(Row oldRowData, Row newRowData) { } + + public void removeRow(Row row) { } + + }; + } + } }