rpuch commented on code in PR #6237:
URL: https://github.com/apache/ignite-3/pull/6237#discussion_r2200795554


##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java:
##########
@@ -301,6 +303,14 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
      */
     private boolean isSystemGroup = false;
 
+    /**
+     * Shared pool of {@link ByteBufferCollector} for sending log entries for 
replication.
+     *
+     * <p>Used to prevent a large number of {@link ByteBufferCollector} from 
being accumulated across all threads that are involved in
+     * sending log entries, see {@link 
ByteBufferCollector#allocateByRecyclers}.</p>
+     */
+    private ByteBufferCollectorPool appendEntriesByteBufferCollectorPool;

Review Comment:
   It seems that in a few places in our code, the same implementation is 
assigned for this option. Would it make sense to initialize the field with it 
right here? In this way we'll make it more difficult to have different pools in 
different tests, for example



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedFifoByteBufferCollectorPool.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Thread safe implementation of a pool based on a fixed-size FIFO linked list.
+ *
+ * <p>{@link ByteBufferCollector} are given out on a first-in, first-out 
basis.</p>
+ */
+public class ConcurrentLinkedFifoByteBufferCollectorPool implements 
ByteBufferCollectorPool {
+    private final int capacity;
+
+    private final AtomicReference<Node> stack = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param capacity Maximum capacity of the pool, expected to be greater 
than zero.
+     */
+    public ConcurrentLinkedFifoByteBufferCollectorPool(int capacity) {
+        assert capacity > 0 : capacity;
+
+        this.capacity = capacity;
+    }
+
+    /** Removes and returns the last added {@link ByteBufferCollector}, {@code 
null} if the pool is empty. */
+    @Override

Review Comment:
   Do we really need javadocs on overrides?



##########
modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util.concurrent;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link ConcurrentLinkedLifoByteBufferCollectorPool} testing. */
+public class ConcurrentLinkedLifoByteBufferCollectorPoolTest {
+    private static final int CAPACITY = 10;
+
+    private final ConcurrentLinkedLifoByteBufferCollectorPool collectorPool = 
new ConcurrentLinkedLifoByteBufferCollectorPool(CAPACITY);
+
+    @Test
+    void borrowFromEmptyPool() {
+        assertNull(collectorPool.borrow());
+    }
+
+    @Test
+    void borrowSingleCollector() {
+        ByteBufferCollector collector = allocateCollector();
+
+        collectorPool.release(collector);
+
+        assertSame(collector, collectorPool.borrow());
+        assertNull(collectorPool.borrow());
+    }
+
+    @Test
+    void borrowSameCollector() {
+        ByteBufferCollector collector = allocateCollector();
+
+        collectorPool.release(collector);
+        collectorPool.release(collector);
+
+        assertSame(collector, collectorPool.borrow());
+        assertSame(collector, collectorPool.borrow());
+        assertNull(collectorPool.borrow());
+    }
+
+    @Test
+    void borrowSeveral() {
+        List<ByteBufferCollector> collectors = allocateListCollector(5);
+
+        collectors.forEach(collectorPool::release);
+
+        assertEquals(reverse(collectors), borrowAll());
+    }
+
+    @RepeatedTest(10)
+    void borrowConcurrent() {
+        ByteBufferCollector collector0 = allocateCollector();
+        ByteBufferCollector collector1 = allocateCollector();
+
+        collectorPool.release(collector0);
+        collectorPool.release(collector1);
+
+        Set<ByteBufferCollector> borrows = ConcurrentHashMap.newKeySet();
+
+        IgniteTestUtils.runRace(
+                () -> borrows.add(collectorPool.borrow()),
+                () -> borrows.add(collectorPool.borrow())
+        );
+
+        assertNull(collectorPool.borrow());
+
+        assertThat(borrows, hasSize(2));
+        assertThat(borrows, hasItems(collector0, collector1));
+    }
+
+    @Test
+    void releaseMoreThanCapacity() {
+        List<ByteBufferCollector> collectors = allocateListCollector(CAPACITY 
+ 5);
+
+        collectors.forEach(collectorPool::release);
+
+        assertEquals(reverse(collectors.subList(0, CAPACITY)), borrowAll());
+    }
+
+    @RepeatedTest(10)
+    void releaseConcurrent() {
+        ByteBufferCollector collector0 = allocateCollector();
+        ByteBufferCollector collector1 = allocateCollector();
+
+        IgniteTestUtils.runRace(
+                () -> collectorPool.release(collector0),
+                () -> collectorPool.release(collector1)
+        );
+
+        List<ByteBufferCollector> borrowAll = borrowAll();
+        assertThat(borrowAll, hasSize(2));
+        assertThat(borrowAll, hasItems(collector0, collector1));
+    }
+
+    @RepeatedTest(10)
+    void borrowReleaseConcurrent() {
+        ByteBufferCollector collector0 = allocateCollector();
+        ByteBufferCollector collector1 = allocateCollector();
+        ByteBufferCollector collector2 = allocateCollector();
+        ByteBufferCollector collector3 = allocateCollector();
+        
+        collectorPool.release(collector0);
+        collectorPool.release(collector1);
+
+        Set<ByteBufferCollector> borrows = ConcurrentHashMap.newKeySet();
+        
+        IgniteTestUtils.runRace(
+                () -> collectorPool.release(collector2),
+                () -> collectorPool.release(collector3),
+                () -> borrows.add(collectorPool.borrow()),
+                () -> borrows.add(collectorPool.borrow())
+        );
+
+        List<ByteBufferCollector> remainingInCollectorPool = borrowAll();
+        
+        assertThat(remainingInCollectorPool, hasSize(2));
+        assertThat(borrows, hasSize(2));
+        
+        assertThat(remainingInCollectorPool, 
not(hasItems(borrows.toArray(ByteBufferCollector[]::new))));
+    }
+
+    private List<ByteBufferCollector> borrowAll() {
+        var res = new ArrayList<ByteBufferCollector>();
+
+        while (true) {
+            ByteBufferCollector collector = collectorPool.borrow();
+
+            if (collector == null) {
+                break;
+            } else {
+                res.add(collector);
+            }
+        }
+
+        return res;
+    }
+
+    private static ByteBufferCollector allocateCollector() {
+        return ByteBufferCollector.allocate(32);
+    }
+
+    private static List<ByteBufferCollector> allocateListCollector(int size) {

Review Comment:
   ```suggestion
       private static List<ByteBufferCollector> allocateCollectors(int size) {
   ```
   Or `allocateListOfCollectors()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to