alievmirza commented on a change in pull request #426:
URL: https://github.com/apache/ignite-3/pull/426#discussion_r751056427



##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
##########
@@ -649,6 +669,91 @@ public void testFollowerCatchUpFromSnapshot2() throws 
Exception {
         doTestFollowerCatchUp(true, false);
     }
 
+    /** Tests if a starting a new group in shared pools mode doesn't increases 
timer threads count. */
+    @Test
+    public void testTimerThreadsCount() {
+        JraftServerImpl srv0 = startServer(0, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv1 = startServer(1, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv2 = startServer(2, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+
+        waitForTopology(srv0.clusterService(), 3, 5_000);
+
+        ExecutorService svc = Executors.newFixedThreadPool(16);
+
+        final int groupsCnt = 10;
+
+        try {
+            List<Future<?>> futs = new ArrayList<>(groupsCnt);
+
+            for (int i = 0; i < groupsCnt; i++) {
+                int finalI = i;
+                futs.add(svc.submit(new Runnable() {
+                    @Override public void run() {
+                        String grp = "counter" + finalI;
+
+                        srv0.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                        srv1.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                        srv2.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                    }
+                }));
+            }
+
+            for (Future<?> fut : futs) {
+                try {
+                    fut.get();
+                } catch (Exception e) {
+                    fail(e.getMessage());
+                }
+            }
+        } finally {
+            ExecutorServiceHelper.shutdownAndAwaitTermination(svc);
+        }
+
+        for (int i = 0; i < groupsCnt; i++) {
+            String grp = "counter" + i;
+
+            assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
+        }
+
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+
+        LOG.info("RAFT threads count {}", threads.stream().filter(t -> 
t.getName().contains("JRaft")).count());
+
+        List<Thread> timerThreads = threads.stream().filter(this::isTimer)
+                .sorted((o1, o2) -> 
o1.getName().compareTo(o2.getName())).collect(toList());
+
+        assertTrue(timerThreads.size() <= 15, "New timer threads: " + 
timerThreads.toString());

Review comment:
       this number must be somehow explained 

##########
File path: 
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
##########
@@ -60,4 +61,11 @@
      * @return Local peer or null if the group is not started.
      */
     @Nullable Peer localPeer(String groupId);
+
+    /**
+     * Returns a set of started partition groups.

Review comment:
       This raft groups doesn't necessarily partition's group, it might be meta 
storage, I would remove "partition"

##########
File path: 
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
##########
@@ -126,6 +128,10 @@ public JraftServerImpl(
     /** {@inheritDoc} */
     @Override
     public void start() {
+        assert opts.isSharedPools() : "RAFT server is supposed to run in 
shared pools mode";
+
+        // Pre-create all pools in shared mode.
+        // Pre-create all pools in shared mode.

Review comment:
       redundant comment
   

##########
File path: 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
##########
@@ -909,7 +909,7 @@ public LogId getLastLogId(final boolean isFlush) {
             this.readLock.unlock();
         }
         try {
-            c.await();
+            c.await(); // TODO FIXME asch this method blocks timer pool 
IGNITE-14832

Review comment:
       Please create separate todo for that task 

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
##########
@@ -649,6 +669,91 @@ public void testFollowerCatchUpFromSnapshot2() throws 
Exception {
         doTestFollowerCatchUp(true, false);
     }
 
+    /** Tests if a starting a new group in shared pools mode doesn't increases 
timer threads count. */
+    @Test
+    public void testTimerThreadsCount() {
+        JraftServerImpl srv0 = startServer(0, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv1 = startServer(1, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv2 = startServer(2, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+
+        waitForTopology(srv0.clusterService(), 3, 5_000);
+
+        ExecutorService svc = Executors.newFixedThreadPool(16);
+
+        final int groupsCnt = 10;
+
+        try {
+            List<Future<?>> futs = new ArrayList<>(groupsCnt);
+
+            for (int i = 0; i < groupsCnt; i++) {
+                int finalI = i;
+                futs.add(svc.submit(new Runnable() {
+                    @Override public void run() {
+                        String grp = "counter" + finalI;
+
+                        srv0.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                        srv1.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                        srv2.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                    }
+                }));
+            }
+
+            for (Future<?> fut : futs) {
+                try {
+                    fut.get();
+                } catch (Exception e) {
+                    fail(e.getMessage());
+                }
+            }
+        } finally {
+            ExecutorServiceHelper.shutdownAndAwaitTermination(svc);
+        }
+
+        for (int i = 0; i < groupsCnt; i++) {
+            String grp = "counter" + i;
+
+            assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
+        }
+
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+
+        LOG.info("RAFT threads count {}", threads.stream().filter(t -> 
t.getName().contains("JRaft")).count());
+
+        List<Thread> timerThreads = threads.stream().filter(this::isTimer)
+                .sorted((o1, o2) -> 
o1.getName().compareTo(o2.getName())).collect(toList());
+
+        assertTrue(timerThreads.size() <= 15, "New timer threads: " + 
timerThreads.toString());
+    }
+
+    /**
+     * Returns {@code true} if thread is related to timers.
+     *
+     * @param name The name.

Review comment:
       wrong param

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
##########
@@ -649,6 +669,91 @@ public void testFollowerCatchUpFromSnapshot2() throws 
Exception {
         doTestFollowerCatchUp(true, false);
     }
 
+    /** Tests if a starting a new group in shared pools mode doesn't increases 
timer threads count. */
+    @Test
+    public void testTimerThreadsCount() {
+        JraftServerImpl srv0 = startServer(0, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv1 = startServer(1, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv2 = startServer(2, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+
+        waitForTopology(srv0.clusterService(), 3, 5_000);
+
+        ExecutorService svc = Executors.newFixedThreadPool(16);
+
+        final int groupsCnt = 10;
+
+        try {
+            List<Future<?>> futs = new ArrayList<>(groupsCnt);
+
+            for (int i = 0; i < groupsCnt; i++) {
+                int finalI = i;
+                futs.add(svc.submit(new Runnable() {
+                    @Override public void run() {
+                        String grp = "counter" + finalI;
+
+                        srv0.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                        srv1.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                        srv2.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF);
+                    }
+                }));
+            }
+
+            for (Future<?> fut : futs) {
+                try {
+                    fut.get();
+                } catch (Exception e) {
+                    fail(e.getMessage());
+                }
+            }
+        } finally {
+            ExecutorServiceHelper.shutdownAndAwaitTermination(svc);
+        }
+
+        for (int i = 0; i < groupsCnt; i++) {
+            String grp = "counter" + i;
+
+            assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
+        }
+
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+
+        LOG.info("RAFT threads count {}", threads.stream().filter(t -> 
t.getName().contains("JRaft")).count());
+
+        List<Thread> timerThreads = threads.stream().filter(this::isTimer)
+                .sorted((o1, o2) -> 
o1.getName().compareTo(o2.getName())).collect(toList());

Review comment:
       could be replaced by `Comparator.comparing(Thread::getName)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to