This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new de54a6982a6 Rename PipelineLifecycleRunnable (#28954)
de54a6982a6 is described below
commit de54a6982a6f78684c8cd13c2e3b952ef58fdbdc
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 6 15:35:44 2023 +0800
Rename PipelineLifecycleRunnable (#28954)
* Rename PipelineLifecycleRunnable
* Move PipelineLifecycleRunnable
---
.../data/pipeline/api/ingest/dumper/Dumper.java | 4 ++--
.../PipelineLifecycleRunnable.java} | 7 +++----
...xecutor.java => AbstractPipelineLifecycleRunnable.java} | 6 +++---
.../data/pipeline/common/execute/ExecuteEngine.java | 14 +++++++-------
.../data/pipeline/core/dumper/InventoryDumper.java | 4 ++--
.../data/pipeline/core/importer/Importer.java | 4 ++--
.../core/importer/SingleChannelConsumerImporter.java | 4 ++--
...est.java => AbstractPipelineLifecycleRunnableTest.java} | 14 +++++++-------
.../data/pipeline/common/execute/ExecuteEngineTest.java | 14 +++++++-------
.../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java | 4 ++--
.../data/pipeline/opengauss/ingest/OpenGaussWALDumper.java | 4 ++--
.../pipeline/postgresql/ingest/PostgreSQLWALDumper.java | 4 ++--
.../data/pipeline/cdc/core/importer/CDCImporter.java | 4 ++--
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 10 +++++-----
.../pipeline/core/fixture/FixtureIncrementalDumper.java | 4 ++--
15 files changed, 50 insertions(+), 51 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
index 79724699ab6..dc12667cd58 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
/**
* Dumper interface.
*/
-public interface Dumper extends LifecycleExecutor {
+public interface Dumper extends PipelineLifecycleRunnable {
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java
similarity index 85%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java
rename to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java
index 3ae17b7159d..4b0233f2f97 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.executor;
+package org.apache.shardingsphere.data.pipeline.api.runnable;
/**
- * Lifecycle executor.
+ * Pipeline lifecycle runnable.
*/
-// TODO task?
-public interface LifecycleExecutor extends Runnable {
+public interface PipelineLifecycleRunnable extends Runnable {
/**
* Start run execute.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java
index 9d91c2d9227..51dcacb213a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.execute;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import java.sql.SQLException;
import java.time.Instant;
@@ -28,10 +28,10 @@ import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Abstract lifecycle executor.
+ * Abstract pipeline lifecycle runnable.
*/
@Slf4j
-public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
+public abstract class AbstractPipelineLifecycleRunnable implements
PipelineLifecycleRunnable {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
index c1a74f7de78..eb6a1b4ea62 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.common.execute;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -73,12 +73,12 @@ public final class ExecuteEngine {
/**
* Submit a {@code LifecycleExecutor} with callback {@code
ExecuteCallback} to execute.
*
- * @param lifecycleExecutor lifecycle executor
+ * @param pipelineLifecycleRunnable lifecycle executor
* @param executeCallback execute callback
* @return execute future
*/
- public CompletableFuture<?> submit(final LifecycleExecutor
lifecycleExecutor, final ExecuteCallback executeCallback) {
- return CompletableFuture.runAsync(lifecycleExecutor,
executorService).whenCompleteAsync((unused, throwable) -> {
+ public CompletableFuture<?> submit(final PipelineLifecycleRunnable
pipelineLifecycleRunnable, final ExecuteCallback executeCallback) {
+ return CompletableFuture.runAsync(pipelineLifecycleRunnable,
executorService).whenCompleteAsync((unused, throwable) -> {
if (null == throwable) {
executeCallback.onSuccess();
} else {
@@ -91,11 +91,11 @@ public final class ExecuteEngine {
/**
* Submit a {@code LifecycleExecutor} to execute.
*
- * @param lifecycleExecutor lifecycle executor
+ * @param pipelineLifecycleRunnable lifecycle executor
* @return execute future
*/
- public CompletableFuture<?> submit(final LifecycleExecutor
lifecycleExecutor) {
- return CompletableFuture.runAsync(lifecycleExecutor, executorService);
+ public CompletableFuture<?> submit(final PipelineLifecycleRunnable
pipelineLifecycleRunnable) {
+ return CompletableFuture.runAsync(pipelineLifecycleRunnable,
executorService);
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index ebd0f8183ee..cb2575bc5b4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -21,7 +21,7 @@ import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
@@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
* Inventory dumper.
*/
@Slf4j
-public final class InventoryDumper extends AbstractLifecycleExecutor
implements Dumper {
+public final class InventoryDumper extends AbstractPipelineLifecycleRunnable
implements Dumper {
@Getter(AccessLevel.PROTECTED)
private final InventoryDumperContext dumperContext;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
index a7707c7b091..2217f39a38a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
/**
* Importer.
*/
-public interface Importer extends LifecycleExecutor {
+public interface Importer extends PipelineLifecycleRunnable {
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index 41589b79391..e272ebd7b83 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
* Single channel consumer importer.
*/
@RequiredArgsConstructor
-public final class SingleChannelConsumerImporter extends
AbstractLifecycleExecutor implements Importer {
+public final class SingleChannelConsumerImporter extends
AbstractPipelineLifecycleRunnable implements Importer {
private final PipelineChannel channel;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java
similarity index 80%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java
index b27766aa4bf..00616f8f5db 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java
@@ -26,11 +26,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class AbstractLifecycleExecutorTest {
+class AbstractPipelineLifecycleRunnableTest {
@Test
void assertRunning() {
- FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+ FixturePipelineLifecycleRunnable executor = new
FixturePipelineLifecycleRunnable();
assertFalse(executor.isRunning());
executor.start();
assertTrue(executor.isRunning());
@@ -40,7 +40,7 @@ class AbstractLifecycleExecutorTest {
@Test
void assertStartRunOnce() {
- FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+ FixturePipelineLifecycleRunnable executor = new
FixturePipelineLifecycleRunnable();
executor.start();
executor.start();
assertThat(executor.runBlockingCount.get(), is(1));
@@ -48,7 +48,7 @@ class AbstractLifecycleExecutorTest {
@Test
void assertStopRunOnce() {
- FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+ FixturePipelineLifecycleRunnable executor = new
FixturePipelineLifecycleRunnable();
executor.start();
executor.stop();
executor.stop();
@@ -57,7 +57,7 @@ class AbstractLifecycleExecutorTest {
@Test
void assertNoStopBeforeStarting() {
- FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+ FixturePipelineLifecycleRunnable executor = new
FixturePipelineLifecycleRunnable();
executor.stop();
executor.stop();
assertThat(executor.doStopCount.get(), is(0));
@@ -65,14 +65,14 @@ class AbstractLifecycleExecutorTest {
@Test
void assertStopStart() {
- FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
+ FixturePipelineLifecycleRunnable executor = new
FixturePipelineLifecycleRunnable();
executor.stop();
executor.start();
assertThat(executor.doStopCount.get(), is(0));
assertThat(executor.runBlockingCount.get(), is(0));
}
- private static class FixtureLifecycleExecutor extends
AbstractLifecycleExecutor {
+ private static class FixturePipelineLifecycleRunnable extends
AbstractPipelineLifecycleRunnable {
private final AtomicInteger runBlockingCount = new AtomicInteger();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
index 4193427298a..99c26bf5005 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.common.execute;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;
@@ -45,24 +45,24 @@ class ExecuteEngineTest {
@Test
void assertSubmitAndTaskSucceeded() {
- LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
+ PipelineLifecycleRunnable pipelineLifecycleRunnable =
mock(PipelineLifecycleRunnable.class);
ExecuteCallback callback = mock(ExecuteCallback.class);
ExecuteEngine executeEngine =
ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName());
- Future<?> future = executeEngine.submit(lifecycleExecutor, callback);
+ Future<?> future = executeEngine.submit(pipelineLifecycleRunnable,
callback);
assertTimeout(Duration.ofSeconds(30L), () -> future.get());
shutdownAndAwaitTerminal(executeEngine);
- verify(lifecycleExecutor).run();
+ verify(pipelineLifecycleRunnable).run();
verify(callback).onSuccess();
}
@Test
void assertSubmitAndTaskFailed() {
- LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
+ PipelineLifecycleRunnable pipelineLifecycleRunnable =
mock(PipelineLifecycleRunnable.class);
RuntimeException expectedException = new RuntimeException("Expected");
- doThrow(expectedException).when(lifecycleExecutor).run();
+ doThrow(expectedException).when(pipelineLifecycleRunnable).run();
ExecuteCallback callback = mock(ExecuteCallback.class);
ExecuteEngine executeEngine =
ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName());
- Future<?> future = executeEngine.submit(lifecycleExecutor, callback);
+ Future<?> future = executeEngine.submit(pipelineLifecycleRunnable,
callback);
Optional<Throwable> actualCause =
assertTimeout(Duration.ofSeconds(30L), () -> execute(future));
assertTrue(actualCause.isPresent());
assertThat(actualCause.get(), is(expectedException));
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 705a02200a8..833834e2560 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
@@ -64,7 +64,7 @@ import java.util.Optional;
* MySQL incremental dumper.
*/
@Slf4j
-public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
+public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
private final IncrementalDumperContext dumperContext;
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index dfa912c4a0e..74de5515898 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -56,7 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
* WAL dumper of openGauss.
*/
@Slf4j
-public final class OpenGaussWALDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
+public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
private final IncrementalDumperContext dumperContext;
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index b23686cbe4e..c284d8967e8 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
* PostgreSQL WAL dumper.
*/
@Slf4j
-public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
+public final class PostgreSQLWALDumper extends
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
private final IncrementalDumperContext dumperContext;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index bef90f99488..0950df7e01f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -25,7 +25,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
*/
@RequiredArgsConstructor
@Slf4j
-public final class CDCImporter extends AbstractLifecycleExecutor implements
Importer {
+public final class CDCImporter extends AbstractPipelineLifecycleRunnable
implements Importer {
@Getter
private final String importerId = RandomStringUtils.randomAlphanumeric(8);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 62129db313b..4034b3d6056 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
+import
org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
@@ -60,7 +60,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
private final String parentJobId;
- private final LifecycleExecutor checkExecutor;
+ private final PipelineLifecycleRunnable checkExecutor;
private final AtomicReference<PipelineDataConsistencyChecker>
consistencyChecker = new AtomicReference<>();
@@ -69,7 +69,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
checkJobConfig = jobItemContext.getJobConfig();
checkJobId = checkJobConfig.getJobId();
parentJobId = checkJobConfig.getParentJobId();
- checkExecutor = new CheckLifecycleExecutor();
+ checkExecutor = new CheckPipelineLifecycleRunnable();
}
@Override
@@ -88,7 +88,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
checkExecutor.stop();
}
- private final class CheckLifecycleExecutor extends
AbstractLifecycleExecutor {
+ private final class CheckPipelineLifecycleRunnable extends
AbstractPipelineLifecycleRunnable {
@Override
protected void runBlocking() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index cc4f96860bb..40467c11371 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
-public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
+public final class FixtureIncrementalDumper extends
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
@Override
protected void runBlocking() {