Repository: cassandra Updated Branches: refs/heads/trunk f9a1a80af -> 48815d4a1
fix SASI memtable switching of flush patch by xedin; reviewed by beobal for CASSANDRA-11159 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48815d4a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48815d4a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48815d4a Branch: refs/heads/trunk Commit: 48815d4a182915e852888cb35273b8e896cea440 Parents: f9a1a80 Author: Pavel Yaskevich <xe...@apache.org> Authored: Thu Feb 11 18:54:04 2016 -0800 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Tue Feb 16 13:06:39 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/lifecycle/Tracker.java | 26 ++++++- .../apache/cassandra/index/sasi/SASIIndex.java | 8 ++ .../cassandra/index/sasi/conf/ColumnIndex.java | 40 +++++++++- .../cassandra/index/sasi/conf/view/View.java | 4 +- .../index/sasi/plan/QueryController.java | 30 +++----- .../MemtableDiscardedNotification.java | 30 ++++++++ .../MemtableSwitchedNotification.java | 30 ++++++++ .../cassandra/index/sasi/SASIIndexTest.java | 77 ++++++++++++++++++++ 9 files changed, 220 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c3bfdc3..f20e983 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.4 + * fix SASI memtable switching on flush (CASSANDRA-11159) * Remove duplicate offline compaction tracking (CASSANDRA-11148) * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130) * Support long name output for nodetool commands (CASSANDRA-7950) http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 4c73472..dd07b19 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -318,6 +318,8 @@ public class Tracker Pair<View, View> result = apply(View.switchMemtable(newMemtable)); if (truncating) notifyRenewed(newMemtable); + else + notifySwitched(result.left.getCurrentMemtable()); return result.left.getCurrentMemtable(); } @@ -349,6 +351,8 @@ public class Tracker // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? fail = notifyAdded(sstables, fail); + notifyDiscarded(memtable); + if (!isDummy() && !cfstore.isValid()) dropSSTables(); @@ -441,16 +445,30 @@ public class Tracker subscriber.handleNotification(notification, this); } - public void notifyRenewed(Memtable renewed) + public void notifyTruncated(long truncatedAt) { - INotification notification = new MemtableRenewedNotification(renewed); + INotification notification = new TruncationNotification(truncatedAt); for (INotificationConsumer subscriber : subscribers) subscriber.handleNotification(notification, this); } - public void notifyTruncated(long truncatedAt) + public void notifyRenewed(Memtable renewed) + { + notify(new MemtableRenewedNotification(renewed)); + } + + public void notifySwitched(Memtable previous) + { + notify(new MemtableSwitchedNotification(previous)); + } + + public void notifyDiscarded(Memtable discarded) + { + notify(new MemtableDiscardedNotification(discarded)); + } + + private void notify(INotification notification) { - INotification notification = new TruncationNotification(truncatedAt); for (INotificationConsumer subscriber : subscribers) subscriber.handleNotification(notification, this); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index d480b82..90cc72e 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -311,6 +311,14 @@ public class SASIIndex implements Index, INotificationConsumer { index.switchMemtable(); } + else if (notification instanceof MemtableSwitchedNotification) + { + index.switchMemtable(((MemtableSwitchedNotification) notification).memtable); + } + else if (notification instanceof MemtableDiscardedNotification) + { + index.discardMemtable(((MemtableDiscardedNotification) notification).memtable); + } } public ColumnIndex getIndex() http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java index 1703bd4..76ab968 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java @@ -22,11 +22,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.UTF8Type; @@ -39,6 +44,7 @@ import org.apache.cassandra.index.sasi.memory.IndexMemtable; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.plan.Expression.Op; import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; @@ -54,6 +60,8 @@ public class ColumnIndex private final Optional<IndexMetadata> config; private final AtomicReference<IndexMemtable> memtable; + private final ConcurrentMap<Memtable, IndexMemtable> pendingFlush = new ConcurrentHashMap<>(); + private final IndexMode mode; private final Component component; @@ -92,17 +100,45 @@ public class ColumnIndex public long index(DecoratedKey key, Row row) { - return memtable.get().index(key, getValueOf(column, row, FBUtilities.nowInSeconds())); + return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds())); } public void switchMemtable() { + // discard current memtable with all of it's data, useful on truncate memtable.set(new IndexMemtable(this)); } + public void switchMemtable(Memtable parent) + { + pendingFlush.putIfAbsent(parent, memtable.getAndSet(new IndexMemtable(this))); + } + + public void discardMemtable(Memtable parent) + { + pendingFlush.remove(parent); + } + + @VisibleForTesting + public IndexMemtable getCurrentMemtable() + { + return memtable.get(); + } + + @VisibleForTesting + public Collection<IndexMemtable> getPendingMemtables() + { + return pendingFlush.values(); + } + public RangeIterator<Long, Token> searchMemtable(Expression e) { - return memtable.get().search(e); + RangeIterator.Builder<Long, Token> builder = new RangeUnionIterator.Builder<>(); + builder.add(getCurrentMemtable().search(e)); + for (IndexMemtable memtable : getPendingMemtables()) + builder.add(memtable.search(e)); + + return builder.build(); } public void update(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java index 505a4d7..1f68b0c 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java @@ -87,9 +87,9 @@ public class View implements Iterable<SSTableIndex> throw new IllegalStateException(String.format("mismatched sizes for intervals tree for keys vs terms: %d != %d", keyIntervalTree.intervalCount(), termTree.intervalCount())); } - public Set<SSTableIndex> match(final Set<SSTableReader> scope, Expression expression) + public Set<SSTableIndex> match(Expression expression) { - return Sets.filter(termTree.search(expression), index -> scope.contains(index.getSSTable())); + return termTree.search(expression); } public List<SSTableIndex> match(ByteBuffer minKey, ByteBuffer maxKey) http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java index 8e10fd0..70de463 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java @@ -20,12 +20,12 @@ package org.apache.cassandra.index.sasi.plan; import java.util.*; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sasi.SASIIndex; @@ -51,18 +51,16 @@ public class QueryController private final ColumnFamilyStore cfs; private final PartitionRangeReadCommand command; + private final DataRange range; private final Map<Collection<Expression>, List<RangeIterator<Long, Token>>> resources = new HashMap<>(); - private final RefViewFragment scope; - private final Set<SSTableReader> sstables; public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand command, long timeQuotaMs) { this.cfs = cfs; this.command = command; + this.range = command.dataRange(); this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs); this.executionStart = System.nanoTime(); - this.scope = getSSTableScope(cfs, command); - this.sstables = new HashSet<>(scope.sstables); } public boolean isForThrift() @@ -178,14 +176,7 @@ public class QueryController public void finish() { - try - { - resources.values().forEach(this::releaseIndexes); - } - finally - { - scope.release(); - } + resources.values().forEach(this::releaseIndexes); } private Map<Expression, Set<SSTableIndex>> getView(OperationType op, Collection<Expression> expressions) @@ -220,7 +211,7 @@ public class QueryController } else { - readers.addAll(view.match(sstables, e)); + readers.addAll(applyScope(view.match(e))); } indexes.put(e, readers); @@ -243,7 +234,7 @@ public class QueryController if (view == null) continue; - Set<SSTableIndex> indexes = view.match(sstables, e); + Set<SSTableIndex> indexes = applyScope(view.match(e)); if (primaryIndexes.size() > indexes.size()) { primaryIndexes = indexes; @@ -254,8 +245,11 @@ public class QueryController return expression == null ? null : Pair.create(expression, primaryIndexes); } - private static RefViewFragment getSSTableScope(ColumnFamilyStore cfs, PartitionRangeReadCommand command) + private Set<SSTableIndex> applyScope(Set<SSTableIndex> indexes) { - return cfs.selectAndReference(org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, command.dataRange().keyRange())); + return Sets.filter(indexes, index -> { + SSTableReader sstable = index.getSSTable(); + return range.startKey().compareTo(sstable.last) <= 0 && (range.stopKey().isMinimum() || sstable.first.compareTo(range.stopKey()) <= 0); + }); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java b/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java new file mode 100644 index 0000000..778cad0 --- /dev/null +++ b/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.notifications; + +import org.apache.cassandra.db.Memtable; + +public class MemtableDiscardedNotification implements INotification +{ + public final Memtable memtable; + + public MemtableDiscardedNotification(Memtable discarded) + { + this.memtable = discarded; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java b/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java new file mode 100644 index 0000000..946de4e --- /dev/null +++ b/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.notifications; + +import org.apache.cassandra.db.Memtable; + +public class MemtableSwitchedNotification implements INotification +{ + public final Memtable memtable; + + public MemtableSwitchedNotification(Memtable switched) + { + this.memtable = switched; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java index a88e594..c9d66f7 100644 --- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java @@ -50,6 +50,8 @@ import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.index.sasi.conf.ColumnIndex; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException; +import org.apache.cassandra.index.sasi.memory.IndexMemtable; +import org.apache.cassandra.index.sasi.plan.QueryController; import org.apache.cassandra.index.sasi.plan.QueryPlan; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -1938,6 +1940,81 @@ public class SASIIndexTest QueryProcessor.executeOnceInternal(String.format("TRUNCATE TABLE %s.%s", KS_NAME, table)); } + @Test + public void testIndexMemtableSwitching() + { + // write some data but don't flush + ColumnFamilyStore store = loadData(new HashMap<String, Pair<String, Integer>>() + {{ + put("key1", Pair.create("Pavel", 14)); + }}, false); + + ColumnIndex index = ((SASIIndex) store.indexManager.getIndexByName("first_name")).getIndex(); + IndexMemtable beforeFlushMemtable = index.getCurrentMemtable(); + + PartitionRangeReadCommand command = new PartitionRangeReadCommand(store.metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(store.metadata), + RowFilter.NONE, + DataLimits.NONE, + DataRange.allData(store.getPartitioner()), + Optional.empty()); + + QueryController controller = new QueryController(store, command, Integer.MAX_VALUE); + org.apache.cassandra.index.sasi.plan.Expression expression = + new org.apache.cassandra.index.sasi.plan.Expression(controller, index) + .add(Operator.LIKE_MATCHES, UTF8Type.instance.fromString("Pavel")); + + Assert.assertTrue(beforeFlushMemtable.search(expression).getCount() > 0); + + store.forceBlockingFlush(); + + IndexMemtable afterFlushMemtable = index.getCurrentMemtable(); + + Assert.assertNotSame(afterFlushMemtable, beforeFlushMemtable); + Assert.assertNull(afterFlushMemtable.search(expression)); + Assert.assertEquals(0, index.getPendingMemtables().size()); + + loadData(new HashMap<String, Pair<String, Integer>>() + {{ + put("key2", Pair.create("Sam", 15)); + }}, false); + + expression = new org.apache.cassandra.index.sasi.plan.Expression(controller, index) + .add(Operator.LIKE_MATCHES, UTF8Type.instance.fromString("Sam")); + + beforeFlushMemtable = index.getCurrentMemtable(); + Assert.assertTrue(beforeFlushMemtable.search(expression).getCount() > 0); + + // let's emulate switching memtable and see if we can still read-data in "pending" + index.switchMemtable(store.getTracker().getView().getCurrentMemtable()); + + Assert.assertNotSame(index.getCurrentMemtable(), beforeFlushMemtable); + Assert.assertEquals(1, index.getPendingMemtables().size()); + + Assert.assertTrue(index.searchMemtable(expression).getCount() > 0); + + // emulate "everything is flushed" notification + index.discardMemtable(store.getTracker().getView().getCurrentMemtable()); + + Assert.assertEquals(0, index.getPendingMemtables().size()); + Assert.assertNull(index.searchMemtable(expression)); + + // test discarding data from memtable + loadData(new HashMap<String, Pair<String, Integer>>() + {{ + put("key3", Pair.create("Jonathan", 16)); + }}, false); + + expression = new org.apache.cassandra.index.sasi.plan.Expression(controller, index) + .add(Operator.LIKE_MATCHES, UTF8Type.instance.fromString("Jonathan")); + + Assert.assertTrue(index.searchMemtable(expression).getCount() > 0); + + index.switchMemtable(); + Assert.assertNull(index.searchMemtable(expression)); + } + private static ColumnFamilyStore loadData(Map<String, Pair<String, Integer>> data, boolean forceFlush) { return loadData(data, System.currentTimeMillis(), forceFlush);