adelapena commented on code in PR #1723:
URL: https://github.com/apache/cassandra/pull/1723#discussion_r969584431


##########
test/unit/org/apache/cassandra/db/ClusteringPrefixTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabAllocator;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusteringPrefixTest
+{
+    @Test
+    public void arrayTopAndBottom()
+    {
+        Assert.assertTrue(ArrayClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(ArrayClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(ArrayClusteringBound.TOP.isTop());
+        Assert.assertFalse(ArrayClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void bufferTopAndBottom()
+    {
+        Assert.assertTrue(BufferClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(BufferClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(BufferClusteringBound.TOP.isTop());
+        Assert.assertFalse(BufferClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void testRetainableArray()
+    {
+        testRetainable(ByteArrayAccessor.instance.factory(), x -> new byte[][] 
{x.getBytes(StandardCharsets.UTF_8)});
+    }
+
+    @Test
+    public void testRetainableOnHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x -> new 
ByteBuffer[] {ByteBufferUtil.bytes(x)});
+    }
+
+    @Test
+    public void testRetainableOnHeapSliced()
+    {
+        for (int prepend = 0; prepend < 3; ++prepend)

Review Comment:
   I'd use braces for the loop



##########
test/unit/org/apache/cassandra/db/tries/MemtableTrieTestBase.java:
##########
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multiset;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.ObjectSizes;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;

Review Comment:
   Unused import



##########
test/unit/org/apache/cassandra/db/ClusteringPrefixTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabAllocator;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusteringPrefixTest
+{
+    @Test
+    public void arrayTopAndBottom()
+    {
+        Assert.assertTrue(ArrayClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(ArrayClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(ArrayClusteringBound.TOP.isTop());
+        Assert.assertFalse(ArrayClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void bufferTopAndBottom()
+    {
+        Assert.assertTrue(BufferClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(BufferClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(BufferClusteringBound.TOP.isTop());
+        Assert.assertFalse(BufferClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void testRetainableArray()
+    {
+        testRetainable(ByteArrayAccessor.instance.factory(), x -> new byte[][] 
{x.getBytes(StandardCharsets.UTF_8)});
+    }
+
+    @Test
+    public void testRetainableOnHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x -> new 
ByteBuffer[] {ByteBufferUtil.bytes(x)});
+    }
+
+    @Test
+    public void testRetainableOnHeapSliced()
+    {
+        for (int prepend = 0; prepend < 3; ++prepend)
+            for (int append = 0; append < 3; ++append)
+            {
+                testRetainable(ByteBufferAccessor.instance.factory(),
+                               slicingAllocator(prepend, append));
+            }
+    }
+
+    private Function<String, ByteBuffer[]> slicingAllocator(int prepend, int 
append)
+    {
+        return x ->
+        {
+            ByteBuffer bytes = ByteBufferUtil.bytes(x);
+            ByteBuffer sliced = ByteBuffer.allocate(bytes.remaining() + 
prepend + append);
+            for (int i = 0; i < prepend; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.put(bytes);
+            bytes.flip();
+            for (int i = 0; i < append; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.position(prepend).limit(prepend + bytes.remaining());
+            return new ByteBuffer[]{ sliced.slice() };
+        };
+    }
+
+    @Test
+    public void testRetainableOffHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x ->
+        {
+            ByteBuffer h = ByteBufferUtil.bytes(x);
+            ByteBuffer v = ByteBuffer.allocateDirect(h.remaining());
+            v.put(h);
+            v.flip();
+            return new ByteBuffer[] {v};
+        });
+    }
+
+    @Test
+    public void testRetainableOnHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(true);
+    }
+
+    @Test
+    public void testRetainableOffHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(false);
+    }
+
+    public void testRetainableSlab(boolean onHeap) throws 
InterruptedException, TimeoutException
+    {
+        MemtablePool pool = new SlabPool(1L << 24, onHeap ? 0 : 1L << 24, 
1.0f, () -> ImmediateFuture.success(false));
+        SlabAllocator allocator = ((SlabAllocator) pool.newAllocator("test"));
+        assert !allocator.allocate(1).isDirect() == onHeap;
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(), x ->
+            {
+                ByteBuffer h = ByteBufferUtil.bytes(x);
+                ByteBuffer v = allocator.allocate(h.remaining());
+                v.put(h);
+                v.flip();
+                return new ByteBuffer[] {v};
+            });
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testRetainableNative() throws InterruptedException, 
TimeoutException
+    {
+        MemtablePool pool = new NativePool(1L << 24,1L << 24, 1.0f, () -> 
ImmediateFuture.success(false));
+        NativeAllocator allocator = (NativeAllocator) 
pool.newAllocator("test");
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(),
+                           x -> new ByteBuffer[] {ByteBufferUtil.bytes(x)},
+                           x -> x.kind() == ClusteringPrefix.Kind.CLUSTERING
+                                ? new NativeClustering(allocator, null, 
(Clustering) x)
+                                : x);
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    public <V> void testRetainable(ValueAccessor.ObjectFactory<V> factory,
+                                   Function<String, V[]> allocator)
+    {
+        testRetainable(factory, allocator, null);
+    }
+
+    public <V> void testRetainable(ValueAccessor.ObjectFactory<V> factory,
+                                   Function<String, V[]> allocator,
+                                   Function<ClusteringPrefix<V>, 
ClusteringPrefix<V>> mapper)
+    {
+        ClusteringPrefix<V>[] clusterings = new ClusteringPrefix[]
+        {
+            factory.clustering(),
+            factory.staticClustering(),
+            factory.clustering(allocator.apply("test")),
+            factory.bound(ClusteringPrefix.Kind.INCL_START_BOUND, 
allocator.apply("testA")),
+            factory.bound(ClusteringPrefix.Kind.INCL_END_BOUND, 
allocator.apply("testB")),
+            factory.bound(ClusteringPrefix.Kind.EXCL_START_BOUND, 
allocator.apply("testC")),
+            factory.bound(ClusteringPrefix.Kind.EXCL_END_BOUND, 
allocator.apply("testD")),
+            
factory.boundary(ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 
allocator.apply("testE")),
+            
factory.boundary(ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 
allocator.apply("testF")),
+        };
+
+        if (mapper != null)
+            clusterings = Arrays.stream(clusterings)
+                                .map(mapper)
+                                .toArray(ClusteringPrefix[]::new);
+
+        testRetainable(clusterings);
+    }
+
+    public void testRetainable(ClusteringPrefix[] clusterings)
+    {
+        for (ClusteringPrefix clustering : clusterings)
+        {
+            ClusteringPrefix retainable = clustering.retainable();
+            assertEquals(clustering, retainable);
+            assertClusteringIsRetainable(retainable);
+        }
+    }
+
+
+    public static void assertClusteringIsRetainable(ClusteringPrefix 
clustering)
+    {
+        if (clustering instanceof AbstractArrayClusteringPrefix)
+            return; // has to be on-heap and minimized
+
+        assertTrue(clustering instanceof AbstractBufferClusteringPrefix);
+        AbstractBufferClusteringPrefix abcf = (AbstractBufferClusteringPrefix) 
clustering;
+        ByteBuffer[] buffers = abcf.getBufferArray();
+        for (ByteBuffer b : buffers)
+        {
+            assertFalse(b.isDirect());
+            assertTrue(b.hasArray());
+            assertTrue(b.capacity() == b.remaining());

Review Comment:
   ```suggestion
               assertEquals(b.capacity(), b.remaining());
   ```



##########
src/java/org/apache/cassandra/utils/memory/HeapPool.java:
##########
@@ -59,7 +59,7 @@ public ByteBuffer allocate(int size, OpOrder.Group opGroup)
 
         public EnsureOnHeap ensureOnHeap()
         {
-            return ENSURE_NOOP;
+            return EnsureOnHeap.NOOP;

Review Comment:
   Nice simplification. We could also use it on 
`Logged.Allocator#ensureOnHeap`, and get rid of `HeapPool.ENSURE_NOOP`.



##########
test/unit/org/apache/cassandra/db/memtable/MemtableSizeHeapBuffersTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+@RunWith(Parameterized.class)
+public class MemtableSizeHeapBuffersTest extends MemtableSizeTestBase
+{
+    // Overrides CQLTester.setUpClass to run before it
+    @BeforeClass
+    public static void setUpClass()
+    {
+        setup(Config.MemtableAllocationType.heap_buffers);
+    }
+
+    @Override
+    void checkMemtablePool()
+    {
+        MemtablePool memoryPool = AbstractAllocatorMemtable.MEMORY_POOL;
+        logger.info("Memtable pool {} off-heap limit {}", memoryPool, 
memoryPool.offHeap.limit);
+        Assert.assertTrue(memoryPool instanceof SlabPool);
+        Assert.assertTrue(memoryPool.offHeap.limit == 0);

Review Comment:
   ```suggestion
           Assert.assertEquals(0, memoryPool.offHeap.limit);
   ```



##########
test/unit/org/apache/cassandra/db/memtable/TrieMemtableConfigTest.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.io.IOException;
+import javax.management.Attribute;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.db.memtable.TrieMemtable.TRIE_MEMTABLE_CONFIG_OBJECT_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class TrieMemtableConfigTest extends CQLTester
+{
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        startJMXServer();
+        createMBeanServerConnection();
+    }
+
+    @Test
+    public void testShardCountSetByJMX() throws MalformedObjectNameException, 
ReflectionException, AttributeNotFoundException, InstanceNotFoundException, 
MBeanException, IOException, InvalidAttributeValueException

Review Comment:
   I think we can just use `throws Exception`



##########
test/unit/org/apache/cassandra/db/tries/CollectionMergeTrieTest.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+import static org.apache.cassandra.db.tries.MemtableTrieTestBase.*;
+import static org.apache.cassandra.db.tries.MergeTrieTest.removeDuplicates;
+
+public class CollectionMergeTrieTest
+{
+    private static final int COUNT = 15000;
+    Random rand = new Random();

Review Comment:
   Can be `private final`, or `private static final`



##########
test/unit/org/apache/cassandra/db/ClusteringPrefixTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabAllocator;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusteringPrefixTest
+{
+    @Test
+    public void arrayTopAndBottom()
+    {
+        Assert.assertTrue(ArrayClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(ArrayClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(ArrayClusteringBound.TOP.isTop());
+        Assert.assertFalse(ArrayClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void bufferTopAndBottom()
+    {
+        Assert.assertTrue(BufferClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(BufferClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(BufferClusteringBound.TOP.isTop());
+        Assert.assertFalse(BufferClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void testRetainableArray()
+    {
+        testRetainable(ByteArrayAccessor.instance.factory(), x -> new byte[][] 
{x.getBytes(StandardCharsets.UTF_8)});
+    }
+
+    @Test
+    public void testRetainableOnHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x -> new 
ByteBuffer[] {ByteBufferUtil.bytes(x)});
+    }
+
+    @Test
+    public void testRetainableOnHeapSliced()
+    {
+        for (int prepend = 0; prepend < 3; ++prepend)
+            for (int append = 0; append < 3; ++append)
+            {
+                testRetainable(ByteBufferAccessor.instance.factory(),
+                               slicingAllocator(prepend, append));
+            }
+    }
+
+    private Function<String, ByteBuffer[]> slicingAllocator(int prepend, int 
append)
+    {
+        return x ->
+        {
+            ByteBuffer bytes = ByteBufferUtil.bytes(x);
+            ByteBuffer sliced = ByteBuffer.allocate(bytes.remaining() + 
prepend + append);
+            for (int i = 0; i < prepend; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.put(bytes);
+            bytes.flip();
+            for (int i = 0; i < append; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.position(prepend).limit(prepend + bytes.remaining());
+            return new ByteBuffer[]{ sliced.slice() };
+        };
+    }
+
+    @Test
+    public void testRetainableOffHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x ->
+        {
+            ByteBuffer h = ByteBufferUtil.bytes(x);
+            ByteBuffer v = ByteBuffer.allocateDirect(h.remaining());
+            v.put(h);
+            v.flip();
+            return new ByteBuffer[] {v};
+        });
+    }
+
+    @Test
+    public void testRetainableOnHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(true);
+    }
+
+    @Test
+    public void testRetainableOffHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(false);
+    }
+
+    public void testRetainableSlab(boolean onHeap) throws 
InterruptedException, TimeoutException
+    {
+        MemtablePool pool = new SlabPool(1L << 24, onHeap ? 0 : 1L << 24, 
1.0f, () -> ImmediateFuture.success(false));
+        SlabAllocator allocator = ((SlabAllocator) pool.newAllocator("test"));
+        assert !allocator.allocate(1).isDirect() == onHeap;
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(), x ->
+            {
+                ByteBuffer h = ByteBufferUtil.bytes(x);
+                ByteBuffer v = allocator.allocate(h.remaining());
+                v.put(h);
+                v.flip();
+                return new ByteBuffer[] {v};
+            });
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testRetainableNative() throws InterruptedException, 
TimeoutException
+    {
+        MemtablePool pool = new NativePool(1L << 24,1L << 24, 1.0f, () -> 
ImmediateFuture.success(false));
+        NativeAllocator allocator = (NativeAllocator) 
pool.newAllocator("test");
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(),
+                           x -> new ByteBuffer[] {ByteBufferUtil.bytes(x)},
+                           x -> x.kind() == ClusteringPrefix.Kind.CLUSTERING
+                                ? new NativeClustering(allocator, null, 
(Clustering) x)

Review Comment:
   ```suggestion
                                   ? new NativeClustering(allocator, null, 
(Clustering<?>) x)
   ```



##########
test/unit/org/apache/cassandra/db/ClusteringPrefixTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabAllocator;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusteringPrefixTest
+{
+    @Test
+    public void arrayTopAndBottom()
+    {
+        Assert.assertTrue(ArrayClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(ArrayClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(ArrayClusteringBound.TOP.isTop());
+        Assert.assertFalse(ArrayClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void bufferTopAndBottom()
+    {
+        Assert.assertTrue(BufferClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(BufferClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(BufferClusteringBound.TOP.isTop());
+        Assert.assertFalse(BufferClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void testRetainableArray()
+    {
+        testRetainable(ByteArrayAccessor.instance.factory(), x -> new byte[][] 
{x.getBytes(StandardCharsets.UTF_8)});
+    }
+
+    @Test
+    public void testRetainableOnHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x -> new 
ByteBuffer[] {ByteBufferUtil.bytes(x)});
+    }
+
+    @Test
+    public void testRetainableOnHeapSliced()
+    {
+        for (int prepend = 0; prepend < 3; ++prepend)
+            for (int append = 0; append < 3; ++append)
+            {
+                testRetainable(ByteBufferAccessor.instance.factory(),
+                               slicingAllocator(prepend, append));
+            }
+    }
+
+    private Function<String, ByteBuffer[]> slicingAllocator(int prepend, int 
append)
+    {
+        return x ->
+        {
+            ByteBuffer bytes = ByteBufferUtil.bytes(x);
+            ByteBuffer sliced = ByteBuffer.allocate(bytes.remaining() + 
prepend + append);
+            for (int i = 0; i < prepend; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.put(bytes);
+            bytes.flip();
+            for (int i = 0; i < append; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.position(prepend).limit(prepend + bytes.remaining());
+            return new ByteBuffer[]{ sliced.slice() };
+        };
+    }
+
+    @Test
+    public void testRetainableOffHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x ->
+        {
+            ByteBuffer h = ByteBufferUtil.bytes(x);
+            ByteBuffer v = ByteBuffer.allocateDirect(h.remaining());
+            v.put(h);
+            v.flip();
+            return new ByteBuffer[] {v};
+        });
+    }
+
+    @Test
+    public void testRetainableOnHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(true);
+    }
+
+    @Test
+    public void testRetainableOffHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(false);
+    }
+
+    public void testRetainableSlab(boolean onHeap) throws 
InterruptedException, TimeoutException
+    {
+        MemtablePool pool = new SlabPool(1L << 24, onHeap ? 0 : 1L << 24, 
1.0f, () -> ImmediateFuture.success(false));
+        SlabAllocator allocator = ((SlabAllocator) pool.newAllocator("test"));
+        assert !allocator.allocate(1).isDirect() == onHeap;
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(), x ->
+            {
+                ByteBuffer h = ByteBufferUtil.bytes(x);
+                ByteBuffer v = allocator.allocate(h.remaining());
+                v.put(h);
+                v.flip();
+                return new ByteBuffer[] {v};
+            });
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testRetainableNative() throws InterruptedException, 
TimeoutException
+    {
+        MemtablePool pool = new NativePool(1L << 24,1L << 24, 1.0f, () -> 
ImmediateFuture.success(false));
+        NativeAllocator allocator = (NativeAllocator) 
pool.newAllocator("test");
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(),
+                           x -> new ByteBuffer[] {ByteBufferUtil.bytes(x)},
+                           x -> x.kind() == ClusteringPrefix.Kind.CLUSTERING
+                                ? new NativeClustering(allocator, null, 
(Clustering) x)
+                                : x);
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    public <V> void testRetainable(ValueAccessor.ObjectFactory<V> factory,
+                                   Function<String, V[]> allocator)
+    {
+        testRetainable(factory, allocator, null);
+    }
+
+    public <V> void testRetainable(ValueAccessor.ObjectFactory<V> factory,
+                                   Function<String, V[]> allocator,
+                                   Function<ClusteringPrefix<V>, 
ClusteringPrefix<V>> mapper)
+    {
+        ClusteringPrefix<V>[] clusterings = new ClusteringPrefix[]
+        {
+            factory.clustering(),
+            factory.staticClustering(),
+            factory.clustering(allocator.apply("test")),
+            factory.bound(ClusteringPrefix.Kind.INCL_START_BOUND, 
allocator.apply("testA")),
+            factory.bound(ClusteringPrefix.Kind.INCL_END_BOUND, 
allocator.apply("testB")),
+            factory.bound(ClusteringPrefix.Kind.EXCL_START_BOUND, 
allocator.apply("testC")),
+            factory.bound(ClusteringPrefix.Kind.EXCL_END_BOUND, 
allocator.apply("testD")),
+            
factory.boundary(ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 
allocator.apply("testE")),
+            
factory.boundary(ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 
allocator.apply("testF")),
+        };
+
+        if (mapper != null)
+            clusterings = Arrays.stream(clusterings)
+                                .map(mapper)
+                                .toArray(ClusteringPrefix[]::new);
+
+        testRetainable(clusterings);
+    }
+
+    public void testRetainable(ClusteringPrefix[] clusterings)
+    {
+        for (ClusteringPrefix clustering : clusterings)
+        {
+            ClusteringPrefix retainable = clustering.retainable();

Review Comment:
   ```suggestion
       public void testRetainable(ClusteringPrefix<?>[] clusterings)
       {
           for (ClusteringPrefix<?> clustering : clusterings)
           {
               ClusteringPrefix<?> retainable = clustering.retainable();
   ```



##########
test/unit/org/apache/cassandra/db/tries/MergeTrieTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+import static org.apache.cassandra.db.tries.MemtableTrieTestBase.*;
+
+public class MergeTrieTest
+{
+    private static final int COUNT = 15000;
+    Random rand = new Random();
+
+    @Test
+    public void testDirect()
+    {
+        ByteComparable[] src1 = generateKeys(rand, COUNT);
+        ByteComparable[] src2 = generateKeys(rand, COUNT);
+        SortedMap<ByteComparable, ByteBuffer> content1 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+        SortedMap<ByteComparable, ByteBuffer> content2 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+
+        MemtableTrie<ByteBuffer> trie1 = makeMemtableTrie(src1, content1, 
true);
+        MemtableTrie<ByteBuffer> trie2 = makeMemtableTrie(src2, content2, 
true);
+
+        content1.putAll(content2);
+        Trie<ByteBuffer> union = trie1.mergeWith(trie2, (x, y) -> x);
+
+        assertSameContent(union, content1);
+    }
+
+    @Test
+    public void testWithDuplicates()
+    {
+        ByteComparable[] src1 = generateKeys(rand, COUNT);
+        ByteComparable[] src2 = generateKeys(rand, COUNT);
+        SortedMap<ByteComparable, ByteBuffer> content1 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+        SortedMap<ByteComparable, ByteBuffer> content2 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+
+        MemtableTrie trie1 = makeMemtableTrie(src1, content1, true);
+        MemtableTrie trie2 = makeMemtableTrie(src2, content2, true);

Review Comment:
   ```suggestion
           MemtableTrie<ByteBuffer> trie1 = makeMemtableTrie(src1, content1, 
true);
           MemtableTrie<ByteBuffer> trie2 = makeMemtableTrie(src2, content2, 
true);
   ```



##########
test/unit/org/apache/cassandra/db/memtable/MemtableSizeOffheapObjectsTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativePool;
+
+@RunWith(Parameterized.class)
+public class MemtableSizeOffheapObjectsTest extends MemtableSizeTestBase
+{
+    // Overrides CQLTester.setUpClass to run before it
+    @BeforeClass
+    public static void setUpClass()
+    {
+        setup(Config.MemtableAllocationType.offheap_objects);
+    }
+
+    @Override
+    void checkMemtablePool()
+    {
+        MemtablePool memoryPool = AbstractAllocatorMemtable.MEMORY_POOL;
+        System.out.println("Memtable pool " + memoryPool + " off-heap limit " 
+ memoryPool.offHeap.limit);

Review Comment:
   This should probably use a logger, as it's done in 
`MemtableSizeOffheapBuffersTest`.



##########
test/unit/org/apache/cassandra/db/ClusteringPrefixTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabAllocator;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusteringPrefixTest
+{
+    @Test
+    public void arrayTopAndBottom()
+    {
+        Assert.assertTrue(ArrayClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(ArrayClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(ArrayClusteringBound.TOP.isTop());
+        Assert.assertFalse(ArrayClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void bufferTopAndBottom()
+    {
+        Assert.assertTrue(BufferClusteringBound.BOTTOM.isBottom());
+        Assert.assertFalse(BufferClusteringBound.BOTTOM.isTop());
+        Assert.assertTrue(BufferClusteringBound.TOP.isTop());
+        Assert.assertFalse(BufferClusteringBound.TOP.isBottom());
+    }
+
+    @Test
+    public void testRetainableArray()
+    {
+        testRetainable(ByteArrayAccessor.instance.factory(), x -> new byte[][] 
{x.getBytes(StandardCharsets.UTF_8)});
+    }
+
+    @Test
+    public void testRetainableOnHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x -> new 
ByteBuffer[] {ByteBufferUtil.bytes(x)});
+    }
+
+    @Test
+    public void testRetainableOnHeapSliced()
+    {
+        for (int prepend = 0; prepend < 3; ++prepend)
+            for (int append = 0; append < 3; ++append)
+            {
+                testRetainable(ByteBufferAccessor.instance.factory(),
+                               slicingAllocator(prepend, append));
+            }
+    }
+
+    private Function<String, ByteBuffer[]> slicingAllocator(int prepend, int 
append)
+    {
+        return x ->
+        {
+            ByteBuffer bytes = ByteBufferUtil.bytes(x);
+            ByteBuffer sliced = ByteBuffer.allocate(bytes.remaining() + 
prepend + append);
+            for (int i = 0; i < prepend; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.put(bytes);
+            bytes.flip();
+            for (int i = 0; i < append; ++i)
+                sliced.put((byte) ThreadLocalRandom.current().nextInt());
+            sliced.position(prepend).limit(prepend + bytes.remaining());
+            return new ByteBuffer[]{ sliced.slice() };
+        };
+    }
+
+    @Test
+    public void testRetainableOffHeap()
+    {
+        testRetainable(ByteBufferAccessor.instance.factory(), x ->
+        {
+            ByteBuffer h = ByteBufferUtil.bytes(x);
+            ByteBuffer v = ByteBuffer.allocateDirect(h.remaining());
+            v.put(h);
+            v.flip();
+            return new ByteBuffer[] {v};
+        });
+    }
+
+    @Test
+    public void testRetainableOnHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(true);
+    }
+
+    @Test
+    public void testRetainableOffHeapSlab() throws InterruptedException, 
TimeoutException
+    {
+        testRetainableSlab(false);
+    }
+
+    public void testRetainableSlab(boolean onHeap) throws 
InterruptedException, TimeoutException
+    {
+        MemtablePool pool = new SlabPool(1L << 24, onHeap ? 0 : 1L << 24, 
1.0f, () -> ImmediateFuture.success(false));
+        SlabAllocator allocator = ((SlabAllocator) pool.newAllocator("test"));
+        assert !allocator.allocate(1).isDirect() == onHeap;
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(), x ->
+            {
+                ByteBuffer h = ByteBufferUtil.bytes(x);
+                ByteBuffer v = allocator.allocate(h.remaining());
+                v.put(h);
+                v.flip();
+                return new ByteBuffer[] {v};
+            });
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testRetainableNative() throws InterruptedException, 
TimeoutException
+    {
+        MemtablePool pool = new NativePool(1L << 24,1L << 24, 1.0f, () -> 
ImmediateFuture.success(false));
+        NativeAllocator allocator = (NativeAllocator) 
pool.newAllocator("test");
+        try
+        {
+            testRetainable(ByteBufferAccessor.instance.factory(),
+                           x -> new ByteBuffer[] {ByteBufferUtil.bytes(x)},
+                           x -> x.kind() == ClusteringPrefix.Kind.CLUSTERING
+                                ? new NativeClustering(allocator, null, 
(Clustering) x)
+                                : x);
+        }
+        finally
+        {
+            pool.shutdownAndWait(10, TimeUnit.SECONDS);
+        }
+    }
+
+    public <V> void testRetainable(ValueAccessor.ObjectFactory<V> factory,
+                                   Function<String, V[]> allocator)
+    {
+        testRetainable(factory, allocator, null);
+    }
+
+    public <V> void testRetainable(ValueAccessor.ObjectFactory<V> factory,
+                                   Function<String, V[]> allocator,
+                                   Function<ClusteringPrefix<V>, 
ClusteringPrefix<V>> mapper)
+    {
+        ClusteringPrefix<V>[] clusterings = new ClusteringPrefix[]
+        {
+            factory.clustering(),
+            factory.staticClustering(),
+            factory.clustering(allocator.apply("test")),
+            factory.bound(ClusteringPrefix.Kind.INCL_START_BOUND, 
allocator.apply("testA")),
+            factory.bound(ClusteringPrefix.Kind.INCL_END_BOUND, 
allocator.apply("testB")),
+            factory.bound(ClusteringPrefix.Kind.EXCL_START_BOUND, 
allocator.apply("testC")),
+            factory.bound(ClusteringPrefix.Kind.EXCL_END_BOUND, 
allocator.apply("testD")),
+            
factory.boundary(ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 
allocator.apply("testE")),
+            
factory.boundary(ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 
allocator.apply("testF")),
+        };
+
+        if (mapper != null)
+            clusterings = Arrays.stream(clusterings)
+                                .map(mapper)
+                                .toArray(ClusteringPrefix[]::new);
+
+        testRetainable(clusterings);
+    }
+
+    public void testRetainable(ClusteringPrefix[] clusterings)
+    {
+        for (ClusteringPrefix clustering : clusterings)
+        {
+            ClusteringPrefix retainable = clustering.retainable();
+            assertEquals(clustering, retainable);
+            assertClusteringIsRetainable(retainable);
+        }
+    }
+
+
+    public static void assertClusteringIsRetainable(ClusteringPrefix 
clustering)

Review Comment:
   ```suggestion
       public static void assertClusteringIsRetainable(ClusteringPrefix<?> 
clustering)
   ```



##########
test/unit/org/apache/cassandra/db/tries/MergeTrieTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+import static org.apache.cassandra.db.tries.MemtableTrieTestBase.*;
+
+public class MergeTrieTest
+{
+    private static final int COUNT = 15000;
+    Random rand = new Random();
+
+    @Test
+    public void testDirect()
+    {
+        ByteComparable[] src1 = generateKeys(rand, COUNT);
+        ByteComparable[] src2 = generateKeys(rand, COUNT);
+        SortedMap<ByteComparable, ByteBuffer> content1 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+        SortedMap<ByteComparable, ByteBuffer> content2 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+
+        MemtableTrie<ByteBuffer> trie1 = makeMemtableTrie(src1, content1, 
true);
+        MemtableTrie<ByteBuffer> trie2 = makeMemtableTrie(src2, content2, 
true);
+
+        content1.putAll(content2);
+        Trie<ByteBuffer> union = trie1.mergeWith(trie2, (x, y) -> x);
+
+        assertSameContent(union, content1);
+    }
+
+    @Test
+    public void testWithDuplicates()
+    {
+        ByteComparable[] src1 = generateKeys(rand, COUNT);
+        ByteComparable[] src2 = generateKeys(rand, COUNT);
+        SortedMap<ByteComparable, ByteBuffer> content1 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+        SortedMap<ByteComparable, ByteBuffer> content2 = new 
TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION));
+
+        MemtableTrie trie1 = makeMemtableTrie(src1, content1, true);
+        MemtableTrie trie2 = makeMemtableTrie(src2, content2, true);
+
+        addToMemtableTrie(generateKeys(new Random(5), COUNT), content1, trie1, 
true);
+        addToMemtableTrie(generateKeys(new Random(5), COUNT), content2, trie2, 
true);
+
+        content1.putAll(content2);
+        Trie union = trie1.mergeWith(trie2, (x, y) -> y);

Review Comment:
   ```suggestion
           Trie<ByteBuffer> union = trie1.mergeWith(trie2, (x, y) -> y);
   ```



##########
test/unit/org/apache/cassandra/db/memtable/MemtableSizeHeapBuffersTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.SlabPool;
+
+@RunWith(Parameterized.class)

Review Comment:
   I think that the `@RunWith(Parameterized.class)` can be put on 
`MemtableSizeTestBase`, so the four implementors don't need to include it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to