This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1ed6c Remove ScenarioParallelRunnerExecutor.ScenarioExecutorService
(#9489)
2e1ed6c is described below
commit 2e1ed6c3a1f3177c8ac82d53720e4fa5f3ff5178
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Feb 24 15:56:36 2021 +0800
Remove ScenarioParallelRunnerExecutor.ScenarioExecutorService (#9489)
* Remove ScenarioParallelRunnerExecutor.ScenarioExecutorService
* Refactor ScenarioParallelRunnerExecutor
---
.../impl/ScenarioParallelRunnerExecutor.java | 62 +++++-----------------
1 file changed, 14 insertions(+), 48 deletions(-)
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index bdbe9fd..41e0936 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -17,59 +17,53 @@
package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.EqualsAndHashCode;
-import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
import
org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
import
org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
-import java.io.Closeable;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Parallel runner executor with scenario.
*/
-@Slf4j
public final class ScenarioParallelRunnerExecutor implements
ParallelRunnerExecutor {
- private final Map<ScenarioKey, ScenarioExecutorService> executorServices =
new ConcurrentHashMap<>();
+ private final Map<ScenarioKey, ExecutorServiceManager>
executorServiceManagers = new ConcurrentHashMap<>();
private final Collection<Future<?>> taskFeatures = new LinkedList<>();
@Override
public void execute(final ParameterizedArray parameterizedArray, final
Runnable childStatement) {
- taskFeatures.add(getExecutorService(new
ScenarioKey(parameterizedArray)).submit(childStatement));
+ taskFeatures.add(getExecutorService(new
ScenarioKey(parameterizedArray)).getExecutorService().submit(childStatement));
}
- private ScenarioExecutorService getExecutorService(final ScenarioKey
scenarioKey) {
- if (executorServices.containsKey(scenarioKey)) {
- return executorServices.get(scenarioKey);
+ private ExecutorServiceManager getExecutorService(final ScenarioKey
scenarioKey) {
+ if (executorServiceManagers.containsKey(scenarioKey)) {
+ return executorServiceManagers.get(scenarioKey);
}
- ScenarioExecutorService newExecutorService = new
ScenarioExecutorService(scenarioKey);
- if (null != executorServices.putIfAbsent(scenarioKey,
newExecutorService)) {
- newExecutorService.close();
+ String threadPoolNameFormat = String.join("-", "ScenarioExecutorPool",
scenarioKey.toString(), "%d");
+ ExecutorServiceManager newExecutorServiceManager = new
ExecutorServiceManager(1, threadPoolNameFormat);
+ if (null != executorServiceManagers.putIfAbsent(scenarioKey,
newExecutorServiceManager)) {
+ newExecutorServiceManager.close();
}
- return executorServices.get(scenarioKey);
+ return executorServiceManagers.get(scenarioKey);
}
@Override
public void finished() {
- taskFeatures.forEach(future -> {
+ taskFeatures.forEach(each -> {
try {
- future.get();
+ each.get();
} catch (final InterruptedException | ExecutionException ignored) {
}
});
- executorServices.values().forEach(ScenarioExecutorService::close);
+ executorServiceManagers.values().forEach(each ->
each.getExecutorService().shutdownNow());
}
/**
@@ -95,32 +89,4 @@ public final class ScenarioParallelRunnerExecutor implements
ParallelRunnerExecu
return String.join("-", adapter, scenario, databaseTypeName);
}
}
-
- /**
- * Scenario executor service.
- */
- private static final class ScenarioExecutorService implements Closeable {
-
- private final ExecutorService executorService;
-
- ScenarioExecutorService(final ScenarioKey scenarioKey) {
- String threadPoolNameFormat = String.join("-",
"ScenarioExecutorPool", scenarioKey.toString(), "%d");
- executorService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new
ThreadFactoryBuilder().setNameFormat(threadPoolNameFormat).build());
- }
-
- /**
- * Submit task.
- *
- * @param childStatement child statement
- * @return task future
- */
- public Future<?> submit(final Runnable childStatement) {
- return executorService.submit(childStatement);
- }
-
- @Override
- public void close() {
- executorService.shutdownNow();
- }
- }
}