This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1ed6c  Remove ScenarioParallelRunnerExecutor.ScenarioExecutorService 
(#9489)
2e1ed6c is described below

commit 2e1ed6c3a1f3177c8ac82d53720e4fa5f3ff5178
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Feb 24 15:56:36 2021 +0800

    Remove ScenarioParallelRunnerExecutor.ScenarioExecutorService (#9489)
    
    * Remove ScenarioParallelRunnerExecutor.ScenarioExecutorService
    
    * Refactor ScenarioParallelRunnerExecutor
---
 .../impl/ScenarioParallelRunnerExecutor.java       | 62 +++++-----------------
 1 file changed, 14 insertions(+), 48 deletions(-)

diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index bdbe9fd..41e0936 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -17,59 +17,53 @@
 
 package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.EqualsAndHashCode;
-import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
 import 
org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
 import 
org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
 
-import java.io.Closeable;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Parallel runner executor with scenario.
  */
-@Slf4j
 public final class ScenarioParallelRunnerExecutor implements 
ParallelRunnerExecutor {
     
-    private final Map<ScenarioKey, ScenarioExecutorService> executorServices = 
new ConcurrentHashMap<>();
+    private final Map<ScenarioKey, ExecutorServiceManager> 
executorServiceManagers = new ConcurrentHashMap<>();
     
     private final Collection<Future<?>> taskFeatures = new LinkedList<>();
     
     @Override
     public void execute(final ParameterizedArray parameterizedArray, final 
Runnable childStatement) {
-        taskFeatures.add(getExecutorService(new 
ScenarioKey(parameterizedArray)).submit(childStatement));
+        taskFeatures.add(getExecutorService(new 
ScenarioKey(parameterizedArray)).getExecutorService().submit(childStatement));
     }
     
-    private ScenarioExecutorService getExecutorService(final ScenarioKey 
scenarioKey) {
-        if (executorServices.containsKey(scenarioKey)) {
-            return executorServices.get(scenarioKey);
+    private ExecutorServiceManager getExecutorService(final ScenarioKey 
scenarioKey) {
+        if (executorServiceManagers.containsKey(scenarioKey)) {
+            return executorServiceManagers.get(scenarioKey);
         }
-        ScenarioExecutorService newExecutorService = new 
ScenarioExecutorService(scenarioKey);
-        if (null != executorServices.putIfAbsent(scenarioKey, 
newExecutorService)) {
-            newExecutorService.close();
+        String threadPoolNameFormat = String.join("-", "ScenarioExecutorPool", 
scenarioKey.toString(), "%d");
+        ExecutorServiceManager newExecutorServiceManager = new 
ExecutorServiceManager(1, threadPoolNameFormat);
+        if (null != executorServiceManagers.putIfAbsent(scenarioKey, 
newExecutorServiceManager)) {
+            newExecutorServiceManager.close();
         }
-        return executorServices.get(scenarioKey);
+        return executorServiceManagers.get(scenarioKey);
     }
     
     @Override
     public void finished() {
-        taskFeatures.forEach(future -> {
+        taskFeatures.forEach(each -> {
             try {
-                future.get();
+                each.get();
             } catch (final InterruptedException | ExecutionException ignored) {
             }
         });
-        executorServices.values().forEach(ScenarioExecutorService::close);
+        executorServiceManagers.values().forEach(each -> 
each.getExecutorService().shutdownNow());
     }
     
     /**
@@ -95,32 +89,4 @@ public final class ScenarioParallelRunnerExecutor implements 
ParallelRunnerExecu
             return String.join("-", adapter, scenario, databaseTypeName);
         }
     }
-    
-    /**
-     * Scenario executor service.
-     */
-    private static final class ScenarioExecutorService implements Closeable {
-        
-        private final ExecutorService executorService;
-        
-        ScenarioExecutorService(final ScenarioKey scenarioKey) {
-            String threadPoolNameFormat = String.join("-", 
"ScenarioExecutorPool", scenarioKey.toString(), "%d");
-            executorService = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new 
ThreadFactoryBuilder().setNameFormat(threadPoolNameFormat).build());
-        }
-        
-        /**
-         * Submit task.
-         * 
-         * @param childStatement child statement
-         * @return task future
-         */
-        public Future<?> submit(final Runnable childStatement) {
-            return executorService.submit(childStatement);
-        }
-        
-        @Override
-        public void close() {
-            executorService.shutdownNow();
-        }
-    }
 }

Reply via email to