This is an automated email from the ASF dual-hosted git repository.

blambov pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 60b54425ed Fix commitLogUpperBound initialization in 
AbstractMemtableWithCommitlog so that it is always available when writeBarrier 
gets initialized.
60b54425ed is described below

commit 60b54425edc0a328bc1baf00c2e5bf111d4b9da8
Author: Jakub Żytka <jakub.zy...@datastax.com>
AuthorDate: Tue May 11 10:01:12 2021 +0200

    Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog
    so that it is always available when writeBarrier gets initialized.
    
    Harden Memtable API so that it is apparent that getting commitLogUpperBound
    is valid only after it is fully established.
    
    patch by Jakub Żytka and Dan Jatnieks;
    reviewed by Dan Jatnieks, Jeremiah D Jordan and Caleb Rackliffe for 
CASSANDRA-17587
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  2 +-
 .../cassandra/db/memtable/AbstractMemtable.java    |  5 +-
 .../db/memtable/AbstractMemtableWithCommitlog.java |  8 ++-
 .../org/apache/cassandra/db/memtable/Flushing.java |  2 +-
 .../org/apache/cassandra/db/memtable/Memtable.java |  2 +-
 .../cassandra/db/commitlog/CommitLogCQLTest.java   | 69 ++++++++++++++++++++++
 7 files changed, 80 insertions(+), 9 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 96bebe774d..a8def999c5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1-alpha2
+ * Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog 
(CASSANDRA-17587)
  * Fix widening to long in getBatchSizeFailThreshold (CASSANDRA-17650)
  * Fix widening from mebibytes to bytes in IntMebibytesBound (CASSANDRA-17716)
  * Revert breaking change in nodetool clientstats and expose cient options 
through nodetool clientstats --client-options. (CASSANDRA-17715)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bf43f327e3..a40e5c7ad1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1103,7 +1103,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
             // If a flush errored out but the error was ignored, make sure we 
don't discard the commit log.
             if (flushFailure == null && mainMemtable != null)
             {
-                commitLogUpperBound = mainMemtable.getCommitLogUpperBound();
+                commitLogUpperBound = 
mainMemtable.getFinalCommitLogUpperBound();
                 CommitLog.instance.discardCompletedSegments(metadata.id, 
mainMemtable.getCommitLogLowerBound(), commitLogUpperBound);
             }
 
diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java 
b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
index 1d683db910..0ac7482e4a 100644
--- a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -204,9 +203,9 @@ public abstract class AbstractMemtable implements Memtable
             return AbstractMemtable.this.getCommitLogLowerBound();
         }
 
-        public CommitLogPosition commitLogUpperBound()
+        public LastCommitLogPosition commitLogUpperBound()
         {
-            return AbstractMemtable.this.getCommitLogUpperBound();
+            return AbstractMemtable.this.getFinalCommitLogUpperBound();
         }
 
         public EncodingStats encodingStats()
diff --git 
a/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java 
b/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java
index d60fe866ab..4fe39a10ca 100644
--- 
a/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java
+++ 
b/src/java/org/apache/cassandra/db/memtable/AbstractMemtableWithCommitlog.java
@@ -57,8 +57,8 @@ public abstract class AbstractMemtableWithCommitlog extends 
AbstractMemtable
         // This can prepare the memtable data for deletion; it will still be 
used while the flush is proceeding.
         // A setDiscarded call will follow.
         assert this.writeBarrier == null;
-        this.writeBarrier = writeBarrier;
         this.commitLogUpperBound = commitLogUpperBound;
+        this.writeBarrier = writeBarrier;
     }
 
     public void discard()
@@ -113,9 +113,11 @@ public abstract class AbstractMemtableWithCommitlog 
extends AbstractMemtable
         return commitLogLowerBound.get();
     }
 
-    public CommitLogPosition getCommitLogUpperBound()
+    public LastCommitLogPosition getFinalCommitLogUpperBound()
     {
-        return commitLogUpperBound.get();
+        assert commitLogUpperBound != null : "Commit log upper bound should be 
set before flushing";
+        assert commitLogUpperBound.get() instanceof LastCommitLogPosition : 
"Commit log upper bound has not been sealed yet? " + commitLogUpperBound.get();
+        return (LastCommitLogPosition) commitLogUpperBound.get();
     }
 
     public boolean mayContainDataBefore(CommitLogPosition position)
diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java 
b/src/java/org/apache/cassandra/db/memtable/Flushing.java
index 6717e64bed..1a31374652 100644
--- a/src/java/org/apache/cassandra/db/memtable/Flushing.java
+++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java
@@ -171,7 +171,7 @@ public class Flushing
                 logger.info("Completed flushing {} ({}) for commitlog position 
{}",
                             writer.getFilename(),
                             FBUtilities.prettyPrintMemory(bytesFlushed),
-                            toFlush.memtable().getCommitLogUpperBound());
+                            toFlush.memtable().getFinalCommitLogUpperBound());
                 // Update the metrics
                 metrics.bytesFlushed.inc(bytesFlushed);
             }
diff --git a/src/java/org/apache/cassandra/db/memtable/Memtable.java 
b/src/java/org/apache/cassandra/db/memtable/Memtable.java
index a4fffafb32..8db28533f8 100644
--- a/src/java/org/apache/cassandra/db/memtable/Memtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java
@@ -369,7 +369,7 @@ public interface Memtable extends Comparable<Memtable>, 
UnfilteredSource
     CommitLogPosition getCommitLogLowerBound();
 
     /** The commit log position at the time that this memtable was switched 
out */
-    CommitLogPosition getCommitLogUpperBound();
+    LastCommitLogPosition getFinalCommitLogUpperBound();
 
     /** True if the memtable can contain any data that was written before the 
given commit log position */
     boolean mayContainDataBefore(CommitLogPosition position);
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java
index 531ca87bee..2fd38d8e8c 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogCQLTest.java
@@ -18,12 +18,19 @@
 
 package org.apache.cassandra.db.commitlog;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 
 public class CommitLogCQLTest extends CQLTester
@@ -56,4 +63,66 @@ public class CommitLogCQLTest extends CQLTester
         
active.retainAll(CommitLog.instance.segmentManager.getActiveSegments());
         assert active.isEmpty();
     }
+    
+    @Test
+    public void testSwitchMemtable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (idx INT, data TEXT, PRIMARY KEY(idx));");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        
+        AtomicBoolean shouldStop = new AtomicBoolean(false);
+        ConcurrentLinkedQueue<Throwable> errors = new 
ConcurrentLinkedQueue<>();
+        List<Thread> threads = new ArrayList<>();
+        
+        final String stmt = String.format("INSERT INTO %s.%s (idx, data) 
VALUES(?, ?)", KEYSPACE, currentTable());
+        for (int i = 0; i < 10; ++i)
+        {
+            threads.add(new Thread("" + i)
+            {
+                public void run()
+                {
+                    try
+                    {
+                        while (!shouldStop.get())
+                        {
+                            for (int i = 0; i < 50; i++)
+                            {
+                                QueryProcessor.executeInternal(stmt, i, 
Integer.toString(i));
+                            }
+                            cfs.dumpMemtable();
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        errors.add(t);
+                        shouldStop.set(true);
+                    }
+                }
+            });
+        }
+
+        for (Thread t : threads)
+            t.start();
+
+        Thread.sleep(15_000);
+        shouldStop.set(true);
+        
+        for (Thread t : threads)
+            t.join();
+
+        if (!errors.isEmpty())
+        {
+            StringBuilder sb = new StringBuilder();
+            for(Throwable error: errors)
+            {
+                sb.append("Got error during memtable switching:\n");
+                sb.append(error.getMessage() + "\n");
+                ByteArrayOutputStream os = new ByteArrayOutputStream();
+                PrintStream ps = new PrintStream(os);
+                error.printStackTrace(ps);
+                sb.append(os.toString("UTF-8"));
+            }
+            Assert.fail(sb.toString());
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to