Delay hints store excise by write timeout to avoid race with decommission

patch by Jaydeepkumar Chovatia; reviewed by Aleksey Yeschenko for
CASSANDRA-13740


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2f6ce96
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2f6ce96
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2f6ce96

Branch: refs/heads/trunk
Commit: b2f6ce961f38a3e4cd744e102026bf7a471056c9
Parents: bc1f841
Author: Jaydeepkumar Chovatia <chovatia.jayd...@gmail.com>
Authored: Thu Aug 3 15:34:26 2017 -0700
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Mon Apr 30 17:32:05 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/hints/HintsCatalog.java    |   7 ++
 .../apache/cassandra/hints/HintsService.java    |   2 +-
 .../org/apache/cassandra/hints/HintsStore.java  |   7 ++
 .../cassandra/service/StorageService.java       |   7 +-
 .../cassandra/hints/HintsCatalogTest.java       | 114 ++++++++++++++++++-
 6 files changed, 135 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cf470d6..857cf96 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Delay hints store excise by write timeout to avoid race with decommission 
(CASSANDRA-13740)
  * Deprecate background repair and probablistic read_repair_chance table 
options
    (CASSANDRA-13910)
  * Add missed CQL keywords to documentation (CASSANDRA-14359)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java 
b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index 6d01629..d1f6fba 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -23,6 +23,7 @@ import java.nio.file.Files;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Stream;
+import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -94,6 +95,12 @@ final class HintsCatalog
              : store;
     }
 
+    @Nullable
+    HintsStore getNullable(UUID hostId)
+    {
+        return stores.get(hostId);
+    }
+
     /**
      * Delete all hints for all host ids.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index 9cd4ed3..268ee1f 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -302,7 +302,7 @@ public final class HintsService implements HintsServiceMBean
      */
     public void excise(UUID hostId)
     {
-        HintsStore store = catalog.get(hostId);
+        HintsStore store = catalog.getNullable(hostId);
         if (store == null)
             return;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java 
b/src/java/org/apache/cassandra/hints/HintsStore.java
index c066331..bb3aa0f 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +78,12 @@ final class HintsStore
         return new HintsStore(hostId, hintsDirectory, writerParams, 
descriptors);
     }
 
+    @VisibleForTesting
+    int getDispatchQueueSize()
+    {
+        return dispatchDequeue.size();
+    }
+
     InetAddress address()
     {
         return StorageService.instance.getEndpointForHostId(hostId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 77fcb81..5f76f7d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2276,7 +2276,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         UUID hostId = tokenMetadata.getHostId(endpoint);
         if (hostId != null && tokenMetadata.isMember(endpoint))
-            HintsService.instance.excise(hostId);
+        {
+            // enough time for writes to expire and MessagingService timeout 
reporter callback to fire, which is where
+            // hints are mostly written from - using getMinRpcTimeout() / 2 
for the interval.
+            long delay = DatabaseDescriptor.getMinRpcTimeout() + 
DatabaseDescriptor.getWriteRpcTimeout();
+            ScheduledExecutors.optionalTasks.schedule(() -> 
HintsService.instance.excise(hostId), delay, TimeUnit.MILLISECONDS);
+        }
 
         removeEndpoint(endpoint);
         tokenMetadata.removeEndpoint(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java 
b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index 51b6aa3..928fd31 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -19,16 +19,42 @@ package org.apache.cassandra.hints;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static junit.framework.Assert.*;
+import static org.apache.cassandra.Util.dk;
 
 public class HintsCatalogTest
 {
+    private static final String KEYSPACE = "hint_test";
+    private static final String TABLE0 = "table_0";
+    private static final String TABLE1 = "table_1";
+    private static final String TABLE2 = "table_2";
+    private static final int WRITE_BUFFER_SIZE = 256 << 10;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                KeyspaceParams.simple(1),
+                SchemaLoader.standardCFMD(KEYSPACE, TABLE0),
+                SchemaLoader.standardCFMD(KEYSPACE, TABLE1),
+                SchemaLoader.standardCFMD(KEYSPACE, TABLE2));
+    }
+
     @Test
     public void loadCompletenessAndOrderTest() throws IOException
     {
@@ -43,7 +69,21 @@ public class HintsCatalogTest
         }
     }
 
-    public static void loadCompletenessAndOrderTest(File directory) throws 
IOException
+    @Test
+    public void exciseHintFiles() throws IOException
+    {
+        File directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            exciseHintFiles(directory);
+        }
+        finally
+        {
+            directory.deleteOnExit();
+        }
+    }
+
+    private void loadCompletenessAndOrderTest(File directory) throws 
IOException
     {
         UUID hostId1 = UUID.randomUUID();
         UUID hostId2 = UUID.randomUUID();
@@ -79,6 +119,39 @@ public class HintsCatalogTest
         assertNull(store2.poll());
     }
 
+    private static void exciseHintFiles(File directory) throws IOException
+    {
+        UUID hostId = UUID.randomUUID();
+
+        HintsDescriptor descriptor1 = new HintsDescriptor(hostId, 
System.currentTimeMillis());
+        HintsDescriptor descriptor2 = new HintsDescriptor(hostId, 
System.currentTimeMillis() + 1);
+        HintsDescriptor descriptor3 = new HintsDescriptor(hostId, 
System.currentTimeMillis() + 2);
+        HintsDescriptor descriptor4 = new HintsDescriptor(hostId, 
System.currentTimeMillis() + 3);
+
+        createHintFile(directory, descriptor1);
+        createHintFile(directory, descriptor2);
+        createHintFile(directory, descriptor3);
+        createHintFile(directory, descriptor4);
+
+        HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
+        assertEquals(1, catalog.stores().count());
+
+        HintsStore store = catalog.get(hostId);
+
+        //should have 4 hint files
+        assertEquals(4, store.getDispatchQueueSize());
+
+        //excise store as a result it should remove all the hint files
+        catalog.exciseStore(hostId);
+
+        catalog = HintsCatalog.load(directory, ImmutableMap.of());
+        assertEquals(0, catalog.stores().count());
+        store = catalog.get(hostId);
+
+        //should have 0 hint files now
+        assertEquals(0, store.getDispatchQueueSize());
+    }
+
     @SuppressWarnings("EmptyTryBlock")
     private static void writeDescriptor(File directory, HintsDescriptor 
descriptor) throws IOException
     {
@@ -86,4 +159,43 @@ public class HintsCatalogTest
         {
         }
     }
+
+    private static Mutation createMutation(String key, long now)
+    {
+        Mutation mutation = new Mutation(KEYSPACE, dk(key));
+
+        new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE0), 
now, mutation)
+                .clustering("column0")
+                .add("val", "value0")
+                .build();
+
+        new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE1), 
now + 1, mutation)
+                .clustering("column1")
+                .add("val", "value1")
+                .build();
+
+        new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE2), 
now + 2, mutation)
+                .clustering("column2")
+                .add("val", "value2")
+                .build();
+
+        return mutation;
+    }
+
+    @SuppressWarnings("EmptyTryBlock")
+    private static void createHintFile(File directory, HintsDescriptor 
descriptor) throws IOException
+    {
+        try (HintsWriter writer = HintsWriter.create(directory, descriptor))
+        {
+            ByteBuffer writeBuffer = 
ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE);
+            try (HintsWriter.Session session = writer.newSession(writeBuffer))
+            {
+                long now = FBUtilities.timestampMicros();
+                Mutation mutation = createMutation("testSerializer", now);
+                Hint hint = Hint.create(mutation, now / 1000);
+
+                session.append(hint);
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to