This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new de54a6982a6 Rename PipelineLifecycleRunnable (#28954)
de54a6982a6 is described below

commit de54a6982a6f78684c8cd13c2e3b952ef58fdbdc
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 6 15:35:44 2023 +0800

    Rename PipelineLifecycleRunnable (#28954)
    
    * Rename PipelineLifecycleRunnable
    
    * Move PipelineLifecycleRunnable
---
 .../data/pipeline/api/ingest/dumper/Dumper.java            |  4 ++--
 .../PipelineLifecycleRunnable.java}                        |  7 +++----
 ...xecutor.java => AbstractPipelineLifecycleRunnable.java} |  6 +++---
 .../data/pipeline/common/execute/ExecuteEngine.java        | 14 +++++++-------
 .../data/pipeline/core/dumper/InventoryDumper.java         |  4 ++--
 .../data/pipeline/core/importer/Importer.java              |  4 ++--
 .../core/importer/SingleChannelConsumerImporter.java       |  4 ++--
 ...est.java => AbstractPipelineLifecycleRunnableTest.java} | 14 +++++++-------
 .../data/pipeline/common/execute/ExecuteEngineTest.java    | 14 +++++++-------
 .../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java |  4 ++--
 .../data/pipeline/opengauss/ingest/OpenGaussWALDumper.java |  4 ++--
 .../pipeline/postgresql/ingest/PostgreSQLWALDumper.java    |  4 ++--
 .../data/pipeline/cdc/core/importer/CDCImporter.java       |  4 ++--
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java | 10 +++++-----
 .../pipeline/core/fixture/FixtureIncrementalDumper.java    |  4 ++--
 15 files changed, 50 insertions(+), 51 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
index 79724699ab6..dc12667cd58 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
@@ -17,10 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
 
 /**
  * Dumper interface.
  */
-public interface Dumper extends LifecycleExecutor {
+public interface Dumper extends PipelineLifecycleRunnable {
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java
similarity index 85%
rename from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java
rename to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java
index 3ae17b7159d..4b0233f2f97 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.executor;
+package org.apache.shardingsphere.data.pipeline.api.runnable;
 
 /**
- * Lifecycle executor.
+ * Pipeline lifecycle runnable.
  */
-// TODO task?
-public interface LifecycleExecutor extends Runnable {
+public interface PipelineLifecycleRunnable extends Runnable {
     
     /**
      * Start run execute.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java
similarity index 92%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java
index 9d91c2d9227..51dcacb213a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.common.execute;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
 
 import java.sql.SQLException;
 import java.time.Instant;
@@ -28,10 +28,10 @@ import java.time.format.DateTimeFormatter;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Abstract lifecycle executor.
+ * Abstract pipeline lifecycle runnable.
  */
 @Slf4j
-public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
+public abstract class AbstractPipelineLifecycleRunnable implements 
PipelineLifecycleRunnable {
     
     private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
index c1a74f7de78..eb6a1b4ea62 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.execute;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
@@ -73,12 +73,12 @@ public final class ExecuteEngine {
     /**
      * Submit a {@code LifecycleExecutor} with callback {@code 
ExecuteCallback} to execute.
      *
-     * @param lifecycleExecutor lifecycle executor
+     * @param pipelineLifecycleRunnable lifecycle executor
      * @param executeCallback execute callback
      * @return execute future
      */
-    public CompletableFuture<?> submit(final LifecycleExecutor 
lifecycleExecutor, final ExecuteCallback executeCallback) {
-        return CompletableFuture.runAsync(lifecycleExecutor, 
executorService).whenCompleteAsync((unused, throwable) -> {
+    public CompletableFuture<?> submit(final PipelineLifecycleRunnable 
pipelineLifecycleRunnable, final ExecuteCallback executeCallback) {
+        return CompletableFuture.runAsync(pipelineLifecycleRunnable, 
executorService).whenCompleteAsync((unused, throwable) -> {
             if (null == throwable) {
                 executeCallback.onSuccess();
             } else {
@@ -91,11 +91,11 @@ public final class ExecuteEngine {
     /**
      * Submit a {@code LifecycleExecutor} to execute.
      *
-     * @param lifecycleExecutor lifecycle executor
+     * @param pipelineLifecycleRunnable lifecycle executor
      * @return execute future
      */
-    public CompletableFuture<?> submit(final LifecycleExecutor 
lifecycleExecutor) {
-        return CompletableFuture.runAsync(lifecycleExecutor, executorService);
+    public CompletableFuture<?> submit(final PipelineLifecycleRunnable 
pipelineLifecycleRunnable) {
+        return CompletableFuture.runAsync(pipelineLifecycleRunnable, 
executorService);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index ebd0f8183ee..cb2575bc5b4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -21,7 +21,7 @@ import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
@@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * Inventory dumper.
  */
 @Slf4j
-public final class InventoryDumper extends AbstractLifecycleExecutor 
implements Dumper {
+public final class InventoryDumper extends AbstractPipelineLifecycleRunnable 
implements Dumper {
     
     @Getter(AccessLevel.PROTECTED)
     private final InventoryDumperContext dumperContext;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
index a7707c7b091..2217f39a38a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
@@ -17,10 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
 
 /**
  * Importer.
  */
-public interface Importer extends LifecycleExecutor {
+public interface Importer extends PipelineLifecycleRunnable {
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index 41589b79391..e272ebd7b83 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
  * Single channel consumer importer.
  */
 @RequiredArgsConstructor
-public final class SingleChannelConsumerImporter extends 
AbstractLifecycleExecutor implements Importer {
+public final class SingleChannelConsumerImporter extends 
AbstractPipelineLifecycleRunnable implements Importer {
     
     private final PipelineChannel channel;
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java
similarity index 80%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java
index b27766aa4bf..00616f8f5db 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java
@@ -26,11 +26,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class AbstractLifecycleExecutorTest {
+class AbstractPipelineLifecycleRunnableTest {
     
     @Test
     void assertRunning() {
-        FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+        FixturePipelineLifecycleRunnable executor = new 
FixturePipelineLifecycleRunnable();
         assertFalse(executor.isRunning());
         executor.start();
         assertTrue(executor.isRunning());
@@ -40,7 +40,7 @@ class AbstractLifecycleExecutorTest {
     
     @Test
     void assertStartRunOnce() {
-        FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+        FixturePipelineLifecycleRunnable executor = new 
FixturePipelineLifecycleRunnable();
         executor.start();
         executor.start();
         assertThat(executor.runBlockingCount.get(), is(1));
@@ -48,7 +48,7 @@ class AbstractLifecycleExecutorTest {
     
     @Test
     void assertStopRunOnce() {
-        FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+        FixturePipelineLifecycleRunnable executor = new 
FixturePipelineLifecycleRunnable();
         executor.start();
         executor.stop();
         executor.stop();
@@ -57,7 +57,7 @@ class AbstractLifecycleExecutorTest {
     
     @Test
     void assertNoStopBeforeStarting() {
-        FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+        FixturePipelineLifecycleRunnable executor = new 
FixturePipelineLifecycleRunnable();
         executor.stop();
         executor.stop();
         assertThat(executor.doStopCount.get(), is(0));
@@ -65,14 +65,14 @@ class AbstractLifecycleExecutorTest {
     
     @Test
     void assertStopStart() {
-        FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+        FixturePipelineLifecycleRunnable executor = new 
FixturePipelineLifecycleRunnable();
         executor.stop();
         executor.start();
         assertThat(executor.doStopCount.get(), is(0));
         assertThat(executor.runBlockingCount.get(), is(0));
     }
     
-    private static class FixtureLifecycleExecutor extends 
AbstractLifecycleExecutor {
+    private static class FixturePipelineLifecycleRunnable extends 
AbstractPipelineLifecycleRunnable {
         
         private final AtomicInteger runBlockingCount = new AtomicInteger();
         
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
index 4193427298a..99c26bf5005 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.execute;
 
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
 import org.junit.jupiter.api.Test;
 import org.mockito.internal.configuration.plugins.Plugins;
 
@@ -45,24 +45,24 @@ class ExecuteEngineTest {
     
     @Test
     void assertSubmitAndTaskSucceeded() {
-        LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
+        PipelineLifecycleRunnable pipelineLifecycleRunnable = 
mock(PipelineLifecycleRunnable.class);
         ExecuteCallback callback = mock(ExecuteCallback.class);
         ExecuteEngine executeEngine = 
ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName());
-        Future<?> future = executeEngine.submit(lifecycleExecutor, callback);
+        Future<?> future = executeEngine.submit(pipelineLifecycleRunnable, 
callback);
         assertTimeout(Duration.ofSeconds(30L), () -> future.get());
         shutdownAndAwaitTerminal(executeEngine);
-        verify(lifecycleExecutor).run();
+        verify(pipelineLifecycleRunnable).run();
         verify(callback).onSuccess();
     }
     
     @Test
     void assertSubmitAndTaskFailed() {
-        LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
+        PipelineLifecycleRunnable pipelineLifecycleRunnable = 
mock(PipelineLifecycleRunnable.class);
         RuntimeException expectedException = new RuntimeException("Expected");
-        doThrow(expectedException).when(lifecycleExecutor).run();
+        doThrow(expectedException).when(pipelineLifecycleRunnable).run();
         ExecuteCallback callback = mock(ExecuteCallback.class);
         ExecuteEngine executeEngine = 
ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName());
-        Future<?> future = executeEngine.submit(lifecycleExecutor, callback);
+        Future<?> future = executeEngine.submit(pipelineLifecycleRunnable, 
callback);
         Optional<Throwable> actualCause = 
assertTimeout(Duration.ofSeconds(30L), () -> execute(future));
         assertTrue(actualCause.isPresent());
         assertThat(actualCause.get(), is(expectedException));
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 705a02200a8..833834e2560 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
@@ -64,7 +64,7 @@ import java.util.Optional;
  * MySQL incremental dumper.
  */
 @Slf4j
-public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor 
implements IncrementalDumper {
+public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
     
     private final IncrementalDumperContext dumperContext;
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index dfa912c4a0e..74de5515898 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -56,7 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * WAL dumper of openGauss.
  */
 @Slf4j
-public final class OpenGaussWALDumper extends AbstractLifecycleExecutor 
implements IncrementalDumper {
+public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
     
     private final IncrementalDumperContext dumperContext;
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index b23686cbe4e..c284d8967e8 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * PostgreSQL WAL dumper.
  */
 @Slf4j
-public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor 
implements IncrementalDumper {
+public final class PostgreSQLWALDumper extends 
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
     
     private final IncrementalDumperContext dumperContext;
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index bef90f99488..0950df7e01f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -25,7 +25,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
  */
 @RequiredArgsConstructor
 @Slf4j
-public final class CDCImporter extends AbstractLifecycleExecutor implements 
Importer {
+public final class CDCImporter extends AbstractPipelineLifecycleRunnable 
implements Importer {
     
     @Getter
     private final String importerId = RandomStringUtils.randomAlphanumeric(8);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 62129db313b..4034b3d6056 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
+import 
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
@@ -60,7 +60,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
     
     private final String parentJobId;
     
-    private final LifecycleExecutor checkExecutor;
+    private final PipelineLifecycleRunnable checkExecutor;
     
     private final AtomicReference<PipelineDataConsistencyChecker> 
consistencyChecker = new AtomicReference<>();
     
@@ -69,7 +69,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         checkJobConfig = jobItemContext.getJobConfig();
         checkJobId = checkJobConfig.getJobId();
         parentJobId = checkJobConfig.getParentJobId();
-        checkExecutor = new CheckLifecycleExecutor();
+        checkExecutor = new CheckPipelineLifecycleRunnable();
     }
     
     @Override
@@ -88,7 +88,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         checkExecutor.stop();
     }
     
-    private final class CheckLifecycleExecutor extends 
AbstractLifecycleExecutor {
+    private final class CheckPipelineLifecycleRunnable extends 
AbstractPipelineLifecycleRunnable {
         
         @Override
         protected void runBlocking() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index cc4f96860bb..40467c11371 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -17,10 +17,10 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 
-public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor 
implements IncrementalDumper {
+public final class FixtureIncrementalDumper extends 
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
     
     @Override
     protected void runBlocking() {

Reply via email to