This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ca1d039ac3 Refactor AbstractSimplePipelineJob.execute to blocking 
(#23510)
6ca1d039ac3 is described below

commit 6ca1d039ac39d39e353c1d4e5ca3f9bfc25b0b18
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 11 20:12:34 2023 +0800

    Refactor AbstractSimplePipelineJob.execute to blocking (#23510)
    
    * Compatible jobId null on stop
    
    * Refactor AbstractSimplePipelineJob.execute to blocking
    
    * Remove throwable output in inventory and incremental task onFailure to 
clean unit test error log
---
 .../data/pipeline/core/execute/ExecuteEngine.java  | 34 +++++++++++++++++-----
 .../pipeline/core/job/AbstractPipelineJob.java     |  6 ++--
 .../data/pipeline/core/task/IncrementalTask.java   |  4 +--
 .../data/pipeline/core/task/InventoryTask.java     |  4 +--
 .../pipeline/core/execute/ExecuteEngineTest.java   | 33 +++++----------------
 5 files changed, 42 insertions(+), 39 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index b92deee1e48..d17e9bf7d70 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -23,10 +23,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
 import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Executor engine.
@@ -88,17 +90,33 @@ public final class ExecuteEngine {
      * @param executeCallback execute callback on all the futures
      */
     public static void trigger(final Collection<CompletableFuture<?>> futures, 
final ExecuteCallback executeCallback) {
-        int futureCount = futures.size();
-        AtomicInteger successCount = new AtomicInteger(0);
-        // TODO call onFailure once
+        BlockingQueue<CompletableFuture<?>> futureQueue = new 
LinkedBlockingQueue<>();
         for (CompletableFuture<?> each : futures) {
             each.whenComplete((unused, throwable) -> {
-                if (null != throwable) {
-                    executeCallback.onFailure(throwable);
-                } else if (successCount.addAndGet(1) == futureCount) {
-                    executeCallback.onSuccess();
+                try {
+                    futureQueue.put(each);
+                } catch (final InterruptedException ex) {
+                    throw new RuntimeException(ex);
                 }
             });
         }
+        for (int i = 1, count = futures.size(); i <= count; i++) {
+            CompletableFuture<?> future;
+            try {
+                future = futureQueue.take();
+            } catch (final InterruptedException ex) {
+                throw new RuntimeException(ex);
+            }
+            try {
+                future.get();
+            } catch (final InterruptedException ex) {
+                throw new RuntimeException(ex);
+            } catch (final ExecutionException ex) {
+                Throwable cause = ex.getCause();
+                executeCallback.onFailure(null != cause ? cause : ex);
+                throw new RuntimeException(ex);
+            }
+        }
+        executeCallback.onSuccess();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index bd211b83cf0..987baa54997 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -120,7 +120,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         if (null != jobBootstrap) {
             jobBootstrap.shutdown();
         }
-        log.info("stop tasks runner, jobId={}", getJobId());
+        log.info("stop tasks runner, jobId={}", jobId);
         for (PipelineTasksRunner each : tasksRunnerMap.values()) {
             each.stop();
         }
@@ -128,7 +128,9 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private void innerClean() {
         tasksRunnerMap.clear();
-        
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+        if (null != jobId) {
+            
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+        }
     }
     
     protected abstract void doClean();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9774c0369e9..58fdfac5106 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -126,7 +126,7 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
             
             @Override
             public void onFailure(final Throwable throwable) {
-                log.error("incremental dumper onFailure, taskId={}", taskId, 
throwable);
+                log.error("incremental dumper onFailure, taskId={}", taskId);
                 stop();
                 close();
             }
@@ -139,7 +139,7 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
             
             @Override
             public void onFailure(final Throwable throwable) {
-                log.error("importer onFailure, taskId={}", taskId, throwable);
+                log.error("importer onFailure, taskId={}", taskId);
                 stop();
                 close();
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index feaf1510444..458ca917e7f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -98,7 +98,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
             
             @Override
             public void onFailure(final Throwable throwable) {
-                log.error("dumper onFailure, taskId={}", taskId, throwable);
+                log.error("dumper onFailure, taskId={}", taskId);
                 stop();
                 close();
             }
@@ -111,7 +111,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
             
             @Override
             public void onFailure(final Throwable throwable) {
-                log.error("importer onFailure, taskId={}", taskId, throwable);
+                log.error("importer onFailure, taskId={}", taskId);
                 stop();
                 close();
             }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
index 976b5da7da0..0ca9f7a8f3b 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/execute/ExecuteEngineTest.java
@@ -27,7 +27,6 @@ import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -83,24 +82,12 @@ public final class ExecuteEngineTest {
         executorService.awaitTermination(30L, TimeUnit.SECONDS);
     }
     
-    @Test
-    public void assertTriggerAllFailure() throws InterruptedException {
-        CompletableFuture<?> future1 = CompletableFuture.runAsync(new 
FixtureRunnable(false));
-        CompletableFuture<?> future2 = CompletableFuture.runAsync(new 
FixtureRunnable(false));
-        FixtureExecuteCallback executeCallback = new FixtureExecuteCallback(2);
-        ExecuteEngine.trigger(Arrays.asList(future1, future2), 
executeCallback);
-        executeCallback.latch.await();
-        assertThat(executeCallback.successCount.get(), is(0));
-        assertThat(executeCallback.failureCount.get(), is(2));
-    }
-    
     @Test
     public void assertTriggerAllSuccess() throws InterruptedException {
         CompletableFuture<?> future1 = CompletableFuture.runAsync(new 
FixtureRunnable(true));
         CompletableFuture<?> future2 = CompletableFuture.runAsync(new 
FixtureRunnable(true));
-        FixtureExecuteCallback executeCallback = new FixtureExecuteCallback(1);
+        FixtureExecuteCallback executeCallback = new FixtureExecuteCallback();
         ExecuteEngine.trigger(Arrays.asList(future1, future2), 
executeCallback);
-        executeCallback.latch.await();
         assertThat(executeCallback.successCount.get(), is(1));
         assertThat(executeCallback.failureCount.get(), is(0));
     }
@@ -109,9 +96,13 @@ public final class ExecuteEngineTest {
     public void assertTriggerPartSuccessFailure() throws InterruptedException {
         CompletableFuture<?> future1 = CompletableFuture.runAsync(new 
FixtureRunnable(true));
         CompletableFuture<?> future2 = CompletableFuture.runAsync(new 
FixtureRunnable(false));
-        FixtureExecuteCallback executeCallback = new FixtureExecuteCallback(1);
-        ExecuteEngine.trigger(Arrays.asList(future1, future2), 
executeCallback);
-        executeCallback.latch.await();
+        FixtureExecuteCallback executeCallback = new FixtureExecuteCallback();
+        try {
+            ExecuteEngine.trigger(Arrays.asList(future1, future2), 
executeCallback);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ignored) {
+            // CHECKSTYLE:ON
+        }
         assertThat(executeCallback.successCount.get(), is(0));
         assertThat(executeCallback.failureCount.get(), is(1));
     }
@@ -131,26 +122,18 @@ public final class ExecuteEngineTest {
     
     private static final class FixtureExecuteCallback implements 
ExecuteCallback {
         
-        private final CountDownLatch latch;
-        
         private final AtomicInteger successCount = new AtomicInteger(0);
         
         private final AtomicInteger failureCount = new AtomicInteger(0);
         
-        FixtureExecuteCallback(final int latchCount) {
-            this.latch = new CountDownLatch(latchCount);
-        }
-        
         @Override
         public void onSuccess() {
             successCount.addAndGet(1);
-            latch.countDown();
         }
         
         @Override
         public void onFailure(final Throwable throwable) {
             failureCount.addAndGet(1);
-            latch.countDown();
         }
     }
 }

Reply via email to