This is an automated email from the ASF dual-hosted git repository. blambov pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push: new 60b54425ed Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog so that it is always available when writeBarrier gets initialized. 60b54425ed is described below commit 60b54425edc0a328bc1baf00c2e5bf111d4b9da8 Author: Jakub Żytka <jakub.zy...@datastax.com> AuthorDate: Tue May 11 10:01:12 2021 +0200 Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog so that it is always available when writeBarrier gets initialized. Harden Memtable API so that it is apparent that getting commitLogUpperBound is valid only after it is fully established. patch by Jakub Żytka and Dan Jatnieks; reviewed by Dan Jatnieks, Jeremiah D Jordan and Caleb Rackliffe for CASSANDRA-17587 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../cassandra/db/memtable/AbstractMemtable.java | 5 +- .../db/memtable/AbstractMemtableWithCommitlog.java | 8 ++- .../org/apache/cassandra/db/memtable/Flushing.java | 2 +- .../org/apache/cassandra/db/memtable/Memtable.java | 2 +- .../cassandra/db/commitlog/CommitLogCQLTest.java | 69 ++++++++++++++++++++++ 7 files changed, 80 insertions(+), 9 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 96bebe774d..a8def999c5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1-alpha2 + * Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog (CASSANDRA-17587) * Fix widening to long in getBatchSizeFailThreshold (CASSANDRA-17650) * Fix widening from mebibytes to bytes in IntMebibytesBound (CASSANDRA-17716) * Revert breaking change in nodetool clientstats and expose cient options through nodetool clientstats --client-options. (CASSANDRA-17715) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bf43f327e3..a40e5c7ad1 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1103,7 +1103,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner // If a flush errored out but the error was ignored, make sure we don't discard the commit log. if (flushFailure == null && mainMemtable != null) { - commitLogUpperBound = mainMemtable.getCommitLogUpperBound(); + commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound(); CommitLog.instance.discardCompletedSegments(metadata.id, mainMemtable.getCommitLogLowerBound(), commitLogUpperBound); } diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java index 1d683db910..0ac7482e4a 100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.commitlog.CommitLogPosition; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.schema.ColumnMetadata; @@ -204,9 +203,9 @@ public abstract class AbstractMemtable implements Memtable return AbstractMemtable.this.getCommitLogLowerBound(); } - public CommitLogPosition commitLogUpperBound() + public LastCommitLogPosition commitLogUpperBound() { - return AbstractMemtable.this.getCommitLogUpperBound(); + return AbstractMemtable.this.getFinalCommitLogUpperBound(); } public EncodingStats encodingStats() diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java b/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java index d60fe866ab..4fe39a10ca 100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java @@ -57,8 +57,8 @@ public abstract class AbstractMemtableWithCommitlog extends AbstractMemtable // This can prepare the memtable data for deletion; it will still be used while the flush is proceeding. // A setDiscarded call will follow. assert this.writeBarrier == null; - this.writeBarrier = writeBarrier; this.commitLogUpperBound = commitLogUpperBound; + this.writeBarrier = writeBarrier; } public void discard() @@ -113,9 +113,11 @@ public abstract class AbstractMemtableWithCommitlog extends AbstractMemtable return commitLogLowerBound.get(); } - public CommitLogPosition getCommitLogUpperBound() + public LastCommitLogPosition getFinalCommitLogUpperBound() { - return commitLogUpperBound.get(); + assert commitLogUpperBound != null : "Commit log upper bound should be set before flushing"; + assert commitLogUpperBound.get() instanceof LastCommitLogPosition : "Commit log upper bound has not been sealed yet? " + commitLogUpperBound.get(); + return (LastCommitLogPosition) commitLogUpperBound.get(); } public boolean mayContainDataBefore(CommitLogPosition position) diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java b/src/java/org/apache/cassandra/db/memtable/Flushing.java index 6717e64bed..1a31374652 100644 --- a/src/java/org/apache/cassandra/db/memtable/Flushing.java +++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java @@ -171,7 +171,7 @@ public class Flushing logger.info("Completed flushing {} ({}) for commitlog position {}", writer.getFilename(), FBUtilities.prettyPrintMemory(bytesFlushed), - toFlush.memtable().getCommitLogUpperBound()); + toFlush.memtable().getFinalCommitLogUpperBound()); // Update the metrics metrics.bytesFlushed.inc(bytesFlushed); } diff --git a/src/java/org/apache/cassandra/db/memtable/Memtable.java b/src/java/org/apache/cassandra/db/memtable/Memtable.java index a4fffafb32..8db28533f8 100644 --- a/src/java/org/apache/cassandra/db/memtable/Memtable.java +++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java @@ -369,7 +369,7 @@ public interface Memtable extends Comparable<Memtable>, UnfilteredSource CommitLogPosition getCommitLogLowerBound(); /** The commit log position at the time that this memtable was switched out */ - CommitLogPosition getCommitLogUpperBound(); + LastCommitLogPosition getFinalCommitLogUpperBound(); /** True if the memtable can contain any data that was written before the given commit log position */ boolean mayContainDataBefore(CommitLogPosition position); diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java index 531ca87bee..2fd38d8e8c 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java @@ -18,12 +18,19 @@ package org.apache.cassandra.db.commitlog; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ColumnFamilyStore; public class CommitLogCQLTest extends CQLTester @@ -56,4 +63,66 @@ public class CommitLogCQLTest extends CQLTester active.retainAll(CommitLog.instance.segmentManager.getActiveSegments()); assert active.isEmpty(); } + + @Test + public void testSwitchMemtable() throws Throwable + { + createTable("CREATE TABLE %s (idx INT, data TEXT, PRIMARY KEY(idx));"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + AtomicBoolean shouldStop = new AtomicBoolean(false); + ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>(); + List<Thread> threads = new ArrayList<>(); + + final String stmt = String.format("INSERT INTO %s.%s (idx, data) VALUES(?, ?)", KEYSPACE, currentTable()); + for (int i = 0; i < 10; ++i) + { + threads.add(new Thread("" + i) + { + public void run() + { + try + { + while (!shouldStop.get()) + { + for (int i = 0; i < 50; i++) + { + QueryProcessor.executeInternal(stmt, i, Integer.toString(i)); + } + cfs.dumpMemtable(); + } + } + catch (Throwable t) + { + errors.add(t); + shouldStop.set(true); + } + } + }); + } + + for (Thread t : threads) + t.start(); + + Thread.sleep(15_000); + shouldStop.set(true); + + for (Thread t : threads) + t.join(); + + if (!errors.isEmpty()) + { + StringBuilder sb = new StringBuilder(); + for(Throwable error: errors) + { + sb.append("Got error during memtable switching:\n"); + sb.append(error.getMessage() + "\n"); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(os); + error.printStackTrace(ps); + sb.append(os.toString("UTF-8")); + } + Assert.fail(sb.toString()); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org