This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6ca1d039ac3 Refactor AbstractSimplePipelineJob.execute to blocking
(#23510)
6ca1d039ac3 is described below
commit 6ca1d039ac39d39e353c1d4e5ca3f9bfc25b0b18
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 11 20:12:34 2023 +0800
Refactor AbstractSimplePipelineJob.execute to blocking (#23510)
* Compatible jobId null on stop
* Refactor AbstractSimplePipelineJob.execute to blocking
* Remove throwable output in inventory and incremental task onFailure to
clean unit test error log
---
.../data/pipeline/core/execute/ExecuteEngine.java | 34 +++++++++++++++++-----
.../pipeline/core/job/AbstractPipelineJob.java | 6 ++--
.../data/pipeline/core/task/IncrementalTask.java | 4 +--
.../data/pipeline/core/task/InventoryTask.java | 4 +--
.../pipeline/core/execute/ExecuteEngineTest.java | 33 +++++----------------
5 files changed, 42 insertions(+), 39 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index b92deee1e48..d17e9bf7d70 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -23,10 +23,12 @@ import
org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Executor engine.
@@ -88,17 +90,33 @@ public final class ExecuteEngine {
* @param executeCallback execute callback on all the futures
*/
public static void trigger(final Collection<CompletableFuture<?>> futures,
final ExecuteCallback executeCallback) {
- int futureCount = futures.size();
- AtomicInteger successCount = new AtomicInteger(0);
- // TODO call onFailure once
+ BlockingQueue<CompletableFuture<?>> futureQueue = new
LinkedBlockingQueue<>();
for (CompletableFuture<?> each : futures) {
each.whenComplete((unused, throwable) -> {
- if (null != throwable) {
- executeCallback.onFailure(throwable);
- } else if (successCount.addAndGet(1) == futureCount) {
- executeCallback.onSuccess();
+ try {
+ futureQueue.put(each);
+ } catch (final InterruptedException ex) {
+ throw new RuntimeException(ex);
}
});
}
+ for (int i = 1, count = futures.size(); i <= count; i++) {
+ CompletableFuture<?> future;
+ try {
+ future = futureQueue.take();
+ } catch (final InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ try {
+ future.get();
+ } catch (final InterruptedException ex) {
+ throw new RuntimeException(ex);
+ } catch (final ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ executeCallback.onFailure(null != cause ? cause : ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ executeCallback.onSuccess();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index bd211b83cf0..987baa54997 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -120,7 +120,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
if (null != jobBootstrap) {
jobBootstrap.shutdown();
}
- log.info("stop tasks runner, jobId={}", getJobId());
+ log.info("stop tasks runner, jobId={}", jobId);
for (PipelineTasksRunner each : tasksRunnerMap.values()) {
each.stop();
}
@@ -128,7 +128,9 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private void innerClean() {
tasksRunnerMap.clear();
-
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+ if (null != jobId) {
+
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+ }
}
protected abstract void doClean();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9774c0369e9..58fdfac5106 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -126,7 +126,7 @@ public final class IncrementalTask implements PipelineTask,
AutoCloseable {
@Override
public void onFailure(final Throwable throwable) {
- log.error("incremental dumper onFailure, taskId={}", taskId,
throwable);
+ log.error("incremental dumper onFailure, taskId={}", taskId);
stop();
close();
}
@@ -139,7 +139,7 @@ public final class IncrementalTask implements PipelineTask,
AutoCloseable {
@Override
public void onFailure(final Throwable throwable) {
- log.error("importer onFailure, taskId={}", taskId, throwable);
+ log.error("importer onFailure, taskId={}", taskId);
stop();
close();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index feaf1510444..458ca917e7f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -98,7 +98,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
@Override
public void onFailure(final Throwable throwable) {
- log.error("dumper onFailure, taskId={}", taskId, throwable);
+ log.error("dumper onFailure, taskId={}", taskId);
stop();
close();
}
@@ -111,7 +111,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
@Override
public void onFailure(final Throwable throwable) {
- log.error("importer onFailure, taskId={}", taskId, throwable);
+ log.error("importer onFailure, taskId={}", taskId);
stop();
close();
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
index 976b5da7da0..0ca9f7a8f3b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
@@ -27,7 +27,6 @@ import org.mockito.internal.configuration.plugins.Plugins;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -83,24 +82,12 @@ public final class ExecuteEngineTest {
executorService.awaitTermination(30L, TimeUnit.SECONDS);
}
- @Test
- public void assertTriggerAllFailure() throws InterruptedException {
- CompletableFuture<?> future1 = CompletableFuture.runAsync(new
FixtureRunnable(false));
- CompletableFuture<?> future2 = CompletableFuture.runAsync(new
FixtureRunnable(false));
- FixtureExecuteCallback executeCallback = new FixtureExecuteCallback(2);
- ExecuteEngine.trigger(Arrays.asList(future1, future2),
executeCallback);
- executeCallback.latch.await();
- assertThat(executeCallback.successCount.get(), is(0));
- assertThat(executeCallback.failureCount.get(), is(2));
- }
-
@Test
public void assertTriggerAllSuccess() throws InterruptedException {
CompletableFuture<?> future1 = CompletableFuture.runAsync(new
FixtureRunnable(true));
CompletableFuture<?> future2 = CompletableFuture.runAsync(new
FixtureRunnable(true));
- FixtureExecuteCallback executeCallback = new FixtureExecuteCallback(1);
+ FixtureExecuteCallback executeCallback = new FixtureExecuteCallback();
ExecuteEngine.trigger(Arrays.asList(future1, future2),
executeCallback);
- executeCallback.latch.await();
assertThat(executeCallback.successCount.get(), is(1));
assertThat(executeCallback.failureCount.get(), is(0));
}
@@ -109,9 +96,13 @@ public final class ExecuteEngineTest {
public void assertTriggerPartSuccessFailure() throws InterruptedException {
CompletableFuture<?> future1 = CompletableFuture.runAsync(new
FixtureRunnable(true));
CompletableFuture<?> future2 = CompletableFuture.runAsync(new
FixtureRunnable(false));
- FixtureExecuteCallback executeCallback = new FixtureExecuteCallback(1);
- ExecuteEngine.trigger(Arrays.asList(future1, future2),
executeCallback);
- executeCallback.latch.await();
+ FixtureExecuteCallback executeCallback = new FixtureExecuteCallback();
+ try {
+ ExecuteEngine.trigger(Arrays.asList(future1, future2),
executeCallback);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ignored) {
+ // CHECKSTYLE:ON
+ }
assertThat(executeCallback.successCount.get(), is(0));
assertThat(executeCallback.failureCount.get(), is(1));
}
@@ -131,26 +122,18 @@ public final class ExecuteEngineTest {
private static final class FixtureExecuteCallback implements
ExecuteCallback {
- private final CountDownLatch latch;
-
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger failureCount = new AtomicInteger(0);
- FixtureExecuteCallback(final int latchCount) {
- this.latch = new CountDownLatch(latchCount);
- }
-
@Override
public void onSuccess() {
successCount.addAndGet(1);
- latch.countDown();
}
@Override
public void onFailure(final Throwable throwable) {
failureCount.addAndGet(1);
- latch.countDown();
}
}
}