kaisun2000 commented on a change in pull request #1452:
URL: https://github.com/apache/helix/pull/1452#discussion_r502005209



##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -710,7 +726,9 @@ protected Message createMessage(Message.MessageType type, 
String msgId, String f
   }
 
   @AfterClass
-  public void cleanupLiveInstanceOwners() {
+  public void cleanupLiveInstanceOwners() throws InterruptedException {

Review comment:
       All the ones using ZkTestBase

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZkServerThrdPattern =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZkSessionThrdPattern =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] ForkJoinThrdPattern = new 
String[]{"ForkJoinPool"};
+  private static final String[] TimerThrdPattern = new String[]{"time"};
+  private static final String[] TaskStateModelThrdPattern = new 
String[]{"TaskStateModel"};
+
+  private static enum ThreadCategory {

Review comment:
       Good question. The two threshold are mostly empirical. 
   
   ZkServer, our version has only 4 threads now. In case later version use 
more. I set the limit to 100. The reasoning is that these ZkServer threads are 
not deemed as leaking no matter how much they have.
   
   ZkSession is the ZkClient and native Zookeeper client we have. ZkTestBase 
has 12 at starting up time. Thus, if there is more than that, it is the test 
code leaking ZkClient.
   
   ForkJoin is created by using parallel stream or whatever Java features. This 
is out of our control, similar to ZkServer. Let me change the _limit to 100 
while keep a small _warningLimit. (Will make change for this one).
   
   Timer  should not happen. Setting limit to 2 not 0 mostly because even when 
you cancel the timer thread, it may take some not deterministic time for it to 
go away. So give it some slack here. 
   
   
   
   

##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -719,6 +737,17 @@ public void cleanupLiveInstanceOwners() {
       clientMap.clear();
     }
     _liveInstanceOwners.clear();
+
+    boolean status = false;
+    try {
+      status = ThreadLeakageChecker.afterClassCheck(testClassName);

Review comment:
       Only those test derived from ZkTestBase. That is majority for now. We 
can open another ticket to track using then for all.

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];

Review comment:
       It does not matter. Note in line 30 ` threads = new 
Thread[threads.length * 2];`, we double the threads array if they are not large 
enough, exponentially.

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];

Review comment:
       It does not matter. Note in line 30 ` threads = new 
Thread[threads.length * 2];`, we double the threads array if it is not large 
enough, exponentially.

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZkServerThrdPattern =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZkSessionThrdPattern =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] ForkJoinThrdPattern = new 
String[]{"ForkJoinPool"};
+  private static final String[] TimerThrdPattern = new String[]{"time"};
+  private static final String[] TaskStateModelThrdPattern = new 
String[]{"TaskStateModel"};
+
+  private static enum ThreadCategory {
+    ZkServer("zookeeper server threads", 4, 100, ZkServerThrdPattern),
+    ZkSession("zkclient/zooKeeper session threads", 12, 12, 
ZkSessionThrdPattern),
+    ForkJoin("fork join pool threads", 2, 10, ForkJoinThrdPattern),
+    Timer("timer threads", 0, 2, TimerThrdPattern),
+    TaskStateModel("TaskStateModel threads", 0, 0, TaskStateModelThrdPattern),
+    Other("Other threads", 0, 3, new String[]{""});
+
+    private String _description;
+    private List<String> _pattern;
+    private int _warningLimit;
+    private int _limit;
+
+    public String getDescription() {
+      return _description;
+    }
+
+    public Predicate<String> getMatchPred() {
+      if (this.name() != ThreadCategory.Other.name()) {
+        Predicate<String> pred = target -> {
+          for (String p : _pattern) {
+            if (target.toLowerCase().contains(p.toLowerCase())) {
+              return true;
+            }
+          }
+          return false;
+        };
+        return pred;
+      }
+
+      List<Predicate<String>> predicateList = new ArrayList<>();
+      for (ThreadCategory threadCategory : ThreadCategory.values()) {
+        if (threadCategory == ThreadCategory.Other) {
+          continue;
+        }
+        predicateList.add(threadCategory.getMatchPred());
+      }
+      Predicate<String> pred = target -> {
+        for (Predicate<String> p : predicateList) {
+          if (p.test(target)) {
+            return false;
+          }
+        }
+        return true;
+      };
+
+      return pred;
+    }
+
+    public int getWarningLimit() {
+      return _warningLimit;
+    }
+
+    public int getLimit() {
+      return _limit;
+    }
+
+    private ThreadCategory(String description, int warningLimit, int limit, 
String[] patterns) {
+      _description = description;
+      _pattern = Arrays.asList(patterns);
+      _warningLimit = warningLimit;
+      _limit = limit;
+    }
+  }
+
+  public static boolean afterClassCheck(String classname) {
+    ZkTestBase.reportPhysicalMemory();
+    // step 1: get all active threads
+    List<Thread> threads = getAllThreads();
+    System.out.println(classname + " has active threads cnt:" + 
threads.size());
+
+    // step 2: categorize threads
+    Map<String, List<Thread>> threadByName = null;
+    Map<ThreadCategory, Integer> threadByCnt = new HashMap<>();
+    Map<ThreadCategory, Set<Thread>> threadByCat = new HashMap<>();
+    try {
+      threadByName = threads.
+          stream().
+          filter(p -> p.getThreadGroup() != null && 
p.getThreadGroup().getName() != null
+              &&  ! "system".equals(p.getThreadGroup().getName())).
+          collect(Collectors.groupingBy(p -> p.getName()));
+    } catch (Exception e) {
+      System.out.println("filtering thread failure with exception:" + 
e.getStackTrace());
+    }
+
+    threadByName.entrySet().stream().forEach(entry -> {
+      String key = entry.getKey(); // thread name
+      Arrays.asList(ThreadCategory.values()).stream().forEach(category -> {
+        if (category.getMatchPred().test(key)) {
+          Integer count = threadByCnt.containsKey(category) ? 
threadByCnt.get(category) : 0;
+          threadByCnt.put(category, count + entry.getValue().size());
+          Set<Thread> thisSet = threadByCat.getOrDefault(category, new 
HashSet<>());
+          thisSet.addAll(entry.getValue());
+          threadByCat.put(category, thisSet);
+        }
+      });
+    });
+
+    // step 3: enforce checking policy
+    boolean checkStatus = true;
+    for (ThreadCategory threadCategory : ThreadCategory.values()) {
+      int limit = threadCategory.getLimit();
+      int warningLimit = threadCategory.getWarningLimit();
+
+      Integer catThreadCnt = threadByCnt.get(threadCategory);
+      if (catThreadCnt != null) {
+        boolean dumpThread = false;
+        if (catThreadCnt > limit) {
+          checkStatus = false;
+          System.out.println(
+              "Failure " + threadCategory.getDescription() + " has " + 
catThreadCnt + " thread");
+          dumpThread = true;
+        } else if (catThreadCnt > warningLimit) {
+          System.out.println(
+              "Warning " + threadCategory.getDescription() + " has " + 
catThreadCnt + " thread");
+          dumpThread = true;
+        } else {
+          System.out.println(threadCategory.getDescription() + " has " + 
catThreadCnt + " thread");
+        }
+        if (!dumpThread) {
+          continue;
+        }
+        // print first 10 thread names
+        int i = 0;
+        for (Thread t : threadByCat.get(threadCategory)) {
+          System.out.println(i + " thread:" + t.getName());
+          i++;
+          if (i == 100) {

Review comment:
       changed the comments

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZkServerThrdPattern =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZkSessionThrdPattern =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] ForkJoinThrdPattern = new 
String[]{"ForkJoinPool"};
+  private static final String[] TimerThrdPattern = new String[]{"time"};
+  private static final String[] TaskStateModelThrdPattern = new 
String[]{"TaskStateModel"};
+
+  private static enum ThreadCategory {
+    ZkServer("zookeeper server threads", 4, 100, ZkServerThrdPattern),
+    ZkSession("zkclient/zooKeeper session threads", 12, 12, 
ZkSessionThrdPattern),
+    ForkJoin("fork join pool threads", 2, 10, ForkJoinThrdPattern),
+    Timer("timer threads", 0, 2, TimerThrdPattern),
+    TaskStateModel("TaskStateModel threads", 0, 0, TaskStateModelThrdPattern),
+    Other("Other threads", 0, 3, new String[]{""});
+
+    private String _description;
+    private List<String> _pattern;
+    private int _warningLimit;
+    private int _limit;
+
+    public String getDescription() {
+      return _description;
+    }
+
+    public Predicate<String> getMatchPred() {
+      if (this.name() != ThreadCategory.Other.name()) {
+        Predicate<String> pred = target -> {
+          for (String p : _pattern) {
+            if (target.toLowerCase().contains(p.toLowerCase())) {
+              return true;
+            }
+          }
+          return false;
+        };
+        return pred;
+      }
+
+      List<Predicate<String>> predicateList = new ArrayList<>();
+      for (ThreadCategory threadCategory : ThreadCategory.values()) {
+        if (threadCategory == ThreadCategory.Other) {
+          continue;
+        }
+        predicateList.add(threadCategory.getMatchPred());
+      }
+      Predicate<String> pred = target -> {
+        for (Predicate<String> p : predicateList) {
+          if (p.test(target)) {
+            return false;
+          }
+        }
+        return true;
+      };
+
+      return pred;
+    }
+
+    public int getWarningLimit() {
+      return _warningLimit;
+    }
+
+    public int getLimit() {
+      return _limit;
+    }
+
+    private ThreadCategory(String description, int warningLimit, int limit, 
String[] patterns) {
+      _description = description;
+      _pattern = Arrays.asList(patterns);
+      _warningLimit = warningLimit;
+      _limit = limit;
+    }
+  }
+
+  public static boolean afterClassCheck(String classname) {
+    ZkTestBase.reportPhysicalMemory();
+    // step 1: get all active threads
+    List<Thread> threads = getAllThreads();
+    System.out.println(classname + " has active threads cnt:" + 
threads.size());
+
+    // step 2: categorize threads
+    Map<String, List<Thread>> threadByName = null;
+    Map<ThreadCategory, Integer> threadByCnt = new HashMap<>();
+    Map<ThreadCategory, Set<Thread>> threadByCat = new HashMap<>();
+    try {
+      threadByName = threads.
+          stream().
+          filter(p -> p.getThreadGroup() != null && 
p.getThreadGroup().getName() != null
+              &&  ! "system".equals(p.getThreadGroup().getName())).
+          collect(Collectors.groupingBy(p -> p.getName()));
+    } catch (Exception e) {
+      System.out.println("filtering thread failure with exception:" + 
e.getStackTrace());
+    }
+
+    threadByName.entrySet().stream().forEach(entry -> {
+      String key = entry.getKey(); // thread name
+      Arrays.asList(ThreadCategory.values()).stream().forEach(category -> {
+        if (category.getMatchPred().test(key)) {
+          Integer count = threadByCnt.containsKey(category) ? 
threadByCnt.get(category) : 0;
+          threadByCnt.put(category, count + entry.getValue().size());
+          Set<Thread> thisSet = threadByCat.getOrDefault(category, new 
HashSet<>());
+          thisSet.addAll(entry.getValue());
+          threadByCat.put(category, thisSet);
+        }
+      });
+    });
+
+    // step 3: enforce checking policy
+    boolean checkStatus = true;
+    for (ThreadCategory threadCategory : ThreadCategory.values()) {
+      int limit = threadCategory.getLimit();
+      int warningLimit = threadCategory.getWarningLimit();
+
+      Integer catThreadCnt = threadByCnt.get(threadCategory);

Review comment:
       changed

##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -127,8 +129,22 @@
   protected Map<String, ClusterSetup> _clusterSetupMap = new HashMap<>();
   protected Map<String, BaseDataAccessor> _baseDataAccessorMap = new 
HashMap<>();
 
+  static public void reportPhysicalMemory() {
+    com.sun.management.OperatingSystemMXBean os = 
(com.sun.management.OperatingSystemMXBean)
+        java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    long physicalMemorySize = os.getTotalPhysicalMemorySize();
+    System.out.println("************ SYSTEM Physical Memory:"  + 
physicalMemorySize);
+
+    int MB = 1024 * 1024;

Review comment:
       changed.

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+

Review comment:
       added.

##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -33,6 +33,7 @@
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 
+import bsh.This;

Review comment:
       removed,

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZkServerThrdPattern =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZkSessionThrdPattern =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] ForkJoinThrdPattern = new 
String[]{"ForkJoinPool"};
+  private static final String[] TimerThrdPattern = new String[]{"time"};
+  private static final String[] TaskStateModelThrdPattern = new 
String[]{"TaskStateModel"};
+
+  private static enum ThreadCategory {

Review comment:
       changed.

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZKSERVER_THRD_PATTERN =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZKSESSION_THRD_PATTERN =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] FORKJOIN_THRD_PATTERN = new 
String[]{"ForkJoinPool"};
+  private static final String[] TIMER_THRD_PATTERN = new String[]{"time"};
+  private static final String[] TASKSTATEMODEL_THRD_PATTERN = new 
String[]{"TaskStateModel"};

Review comment:
       As long as developers start to use them, they can easier break out more 
categories and make the changes accordingly. Eventually the baton is passed 
code contributors. 

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {

Review comment:
       I think the checker by itself is thread safe. But currently if multiple 
test is running, checker won't be of any use for obvious reasons. Let me add an 
comment its usage depends on test running sequentially.

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,181 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZkServerThrdPattern =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZkSessionThrdPattern =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] ForkJoinThrdPattern = new 
String[]{"ForkJoinPool"};
+  private static final String[] TimerThrdPattern = new String[]{"time"};
+  private static final String[] TaskStateModelThrdPattern = new 
String[]{"TaskStateModel"};
+
+  private static enum ThreadCategory {

Review comment:
       added the comment

##########
File path: helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {

Review comment:
       added to the comment that we need to run tests sequentially to make this 
leakage check to be meaningful.

##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -710,7 +725,9 @@ protected Message createMessage(Message.MessageType type, 
String msgId, String f
   }
 
   @AfterClass
-  public void cleanupLiveInstanceOwners() {
+  public void cleanupLiveInstanceOwners() throws InterruptedException {
+    String testClassName = this.getShortClassName();
+    System.out.println("AfterClass:" + testClassName + " afterclass of 
ZkTestBase called!");

Review comment:
       removed

##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -719,6 +736,17 @@ public void cleanupLiveInstanceOwners() {
       clientMap.clear();
     }
     _liveInstanceOwners.clear();
+
+    boolean status = false;
+    try {
+      status = ThreadLeakageChecker.afterClassCheck(testClassName);
+    } catch (Exception e) {
+      System.out.println("ThreadLeakageChecker exception:" + 
e.getStackTrace());

Review comment:
       Good point. changed to log.error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to