This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3c0d1  Rename Channel to PipelineChannel (#14629)
bd3c0d1 is described below

commit bd3c0d1758d15b0649543da6328915d783b54c3e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Jan 9 13:34:32 2022 +0800

    Rename Channel to PipelineChannel (#14629)
---
 .../data/pipeline/core/importer/AbstractImporter.java          |  4 ++--
 .../{MemoryChannel.java => MemoryPipelineChannel.java}         | 10 +++++-----
 .../pipeline/core/ingest/dumper/AbstractInventoryDumper.java   |  4 ++--
 .../core/spi/ingest/channel/MemoryPipelineChannelFactory.java  |  8 ++++----
 .../data/pipeline/core/task/IncrementalTask.java               |  6 +++---
 .../shardingsphere/data/pipeline/core/task/InventoryTask.java  |  4 ++--
 .../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java     |  4 ++--
 .../data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java |  6 +++---
 .../data/pipeline/opengauss/ingest/OpenGaussWalDumper.java     |  4 ++--
 .../data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java   |  4 ++--
 .../pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java    |  6 +++---
 .../api/ingest/channel/{Channel.java => PipelineChannel.java}  |  5 ++---
 .../shardingsphere/data/pipeline/spi/importer/Importer.java    |  4 ++--
 .../pipeline/spi/ingest/channel/PipelineChannelFactory.java    |  6 +++---
 .../shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java |  4 ++--
 .../data/pipeline/core/fixture/FixtureImporter.java            |  4 ++--
 .../data/pipeline/core/fixture/FixtureIncrementalDumper.java   |  4 ++--
 .../data/pipeline/core/importer/AbstractImporterTest.java      |  4 ++--
 ...hannelTest.java => AutoAcknowledgePipelineChannelTest.java} |  2 +-
 .../{MemoryChannelTest.java => MemoryPipelineChannelTest.java} |  6 +++---
 20 files changed, 49 insertions(+), 50 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index f0a2d5f..e7eefb7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
@@ -61,7 +61,7 @@ public abstract class AbstractImporter extends 
AbstractLifecycleExecutor impleme
     private final PipelineSQLBuilder pipelineSqlBuilder;
     
     @Setter
-    private Channel channel;
+    private PipelineChannel channel;
     
     @Setter
     private ImporterListener importerListener;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannel.java
similarity index 94%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannel.java
index b704955..b845ae9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannel.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * Distribution channel.
  */
 @Slf4j
-public final class MemoryChannel implements Channel {
+public final class MemoryPipelineChannel implements PipelineChannel {
     
     private final int channelNumber;
     
@@ -62,15 +62,15 @@ public final class MemoryChannel implements Channel {
     
     private ScheduledExecutorService scheduleAckRecordsExecutor;
     
-    public MemoryChannel(final AckCallback ackCallback) {
+    public MemoryPipelineChannel(final AckCallback ackCallback) {
         this(10000, ackCallback);
     }
     
-    public MemoryChannel(final int blockQueueSize, final AckCallback 
ackCallback) {
+    public MemoryPipelineChannel(final int blockQueueSize, final AckCallback 
ackCallback) {
         this(1, blockQueueSize, ackCallback);
     }
     
-    public MemoryChannel(final int channelNumber, final int blockQueueSize, 
final AckCallback ackCallback) {
+    public MemoryPipelineChannel(final int channelNumber, final int 
blockQueueSize, final AckCallback ackCallback) {
         this.channelNumber = channelNumber;
         this.ackCallback = ackCallback;
         channels = new BitSetChannel[channelNumber];
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 98b39bb..7d44841 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -68,7 +68,7 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
     private final TableMetaData tableMetaData;
     
     @Setter
-    private Channel channel;
+    private PipelineChannel channel;
     
     protected AbstractInventoryDumper(final InventoryDumperConfiguration 
inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager) {
         if 
(!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass()))
 {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
index 44d00fd..2b8aabf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel;
 
 import com.google.common.base.Strings;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 
 import java.util.Properties;
@@ -57,8 +57,8 @@ public final class MemoryPipelineChannelFactory implements 
PipelineChannelFactor
     }
     
     @Override
-    public Channel createPipelineChannel(final int outputConcurrency, final 
AckCallback ackCallback) {
-        return new MemoryChannel(outputConcurrency, blockQueueSize, 
ackCallback);
+    public PipelineChannel createPipelineChannel(final int outputConcurrency, 
final AckCallback ackCallback) {
+        return new MemoryPipelineChannel(outputConcurrency, blockQueueSize, 
ackCallback);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index c1289c0..f90ee91 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
@@ -106,7 +106,7 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
     }
     
     private void instanceChannel(final Collection<Importer> importers) {
-        Channel channel = 
pipelineChannelFactory.createPipelineChannel(importers.size(), records -> {
+        PipelineChannel channel = 
pipelineChannelFactory.createPipelineChannel(importers.size(), records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
             if (!(lastHandledRecord.getPosition() instanceof 
PlaceholderPosition)) {
                 progress.setPosition(lastHandledRecord.getPosition());
@@ -114,7 +114,7 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
             }
         });
         dumper.setChannel(channel);
-        // TODO merge logic into AckCallback after Channel.ack refactoring, 
and then remove ImporterListener
+        // TODO merge logic into AckCallback after PipelineChannel.ack 
refactoring, and then remove ImporterListener
         ImporterListener importerListener = records -> 
progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         for (Importer each : importers) {
             each.setChannel(channel);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 5c4ecea..313c686 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -111,7 +111,7 @@ public final class InventoryTask extends 
AbstractLifecycleExecutor implements Pi
     }
     
     private void instanceChannel(final Importer importer) {
-        Channel channel = pipelineChannelFactory.createPipelineChannel(1, 
records -> {
+        PipelineChannel channel = 
pipelineChannelFactory.createPipelineChannel(1, records -> {
             Optional<Record> record = records.stream().filter(each -> 
!(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
             record.ifPresent(value -> position = value.getPosition());
         });
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index da8b6a1..3a0ac48 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
@@ -76,7 +76,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     private final Random random = new SecureRandom();
     
     @Setter
-    private Channel channel;
+    private PipelineChannel channel;
     
     static {
         ShardingSphereServiceLoader.register(ValueHandler.class);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 24a5481..b32184f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -58,13 +58,13 @@ public final class MySQLIncrementalDumperTest {
     
     private MySQLIncrementalDumper incrementalDumper;
     
-    private MemoryChannel channel;
+    private MemoryPipelineChannel channel;
     
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
-        channel = new MemoryChannel(records -> {
+        channel = new MemoryPipelineChannel(records -> {
         });
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new 
BinlogPosition("binlog-000001", 4L));
         incrementalDumper.setChannel(channel);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index eb1d197..0c1a12c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
@@ -63,7 +63,7 @@ public final class OpenGaussWalDumper extends 
AbstractLifecycleExecutor implemen
     private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;
     
     @Setter
-    private Channel channel;
+    private PipelineChannel channel;
     
     public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final 
IngestPosition<WalPosition> position) {
         walPosition = (WalPosition) position;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index fcd6a4c..e914db5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
@@ -59,7 +59,7 @@ public final class PostgreSQLWalDumper extends 
AbstractLifecycleExecutor impleme
     private final WalEventConverter walEventConverter;
     
     @Setter
-    private Channel channel;
+    private PipelineChannel channel;
     
     public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final 
IngestPosition<WalPosition> position) {
         walPosition = (WalPosition) position;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index f33e5b5..ecd2ddd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
@@ -64,13 +64,13 @@ public final class PostgreSQLWalDumperTest {
     
     private StandardPipelineDataSourceConfiguration pipelineDataSourceConfig;
     
-    private MemoryChannel channel;
+    private MemoryPipelineChannel channel;
     
     @Before
     public void setUp() {
         position = new WalPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), 
position);
-        channel = new MemoryChannel(records -> {
+        channel = new MemoryPipelineChannel(records -> {
         });
         walDumper.setChannel(channel);
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
similarity index 95%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
index 51cd0f0..deec0de 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
@@ -22,10 +22,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import java.util.List;
 
 /**
- * Channel.
+ * Pipeline channel.
  */
-// TODO rename to PipelineChannel
-public interface Channel {
+public interface PipelineChannel {
     
     /**
      * Push {@code DataRecord} into channel.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
index 493a500..de033cb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.spi.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 
 /**
  * Importer.
@@ -30,7 +30,7 @@ public interface Importer extends LifecycleExecutor {
      *
      * @param channel channel
      */
-    void setChannel(Channel channel);
+    void setChannel(PipelineChannel channel);
     
     /**
      * Write data to channel.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
index 22277e8..fafef82 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
@@ -32,7 +32,7 @@ public interface PipelineChannelFactory extends 
ShardingSphereAlgorithm, Shardin
      *
      * @param outputConcurrency output concurrency
      * @param ackCallback ack callback
-     * @return {@link Channel}
+     * @return {@link PipelineChannel}
      */
-    Channel createPipelineChannel(int outputConcurrency, AckCallback 
ackCallback);
+    PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback 
ackCallback);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
index 02798e2..3958931 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 
 /**
  * Dumper interface.
@@ -30,5 +30,5 @@ public interface Dumper extends LifecycleExecutor {
      *
      * @param channel channel
      */
-    void setChannel(Channel channel);
+    void setChannel(PipelineChannel channel);
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index ba05ac7..387dd70 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 
@@ -28,7 +28,7 @@ public final class FixtureImporter implements Importer {
     }
     
     @Override
-    public void setChannel(final Channel channel) {
+    public void setChannel(final PipelineChannel channel) {
     }
     
     @Override
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index f38d8d7..adc3bab 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 
@@ -28,7 +28,7 @@ public final class FixtureIncrementalDumper implements 
IncrementalDumper {
     }
     
     @Override
-    public void setChannel(final Channel channel) {
+    public void setChannel(final PipelineChannel channel) {
     }
     
     @Override
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 5b087d9..5eb0810 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -73,7 +73,7 @@ public final class AbstractImporterTest {
     private PipelineDataSourceConfiguration dataSourceConfig;
     
     @Mock
-    private Channel channel;
+    private PipelineChannel channel;
     
     @Mock
     private PipelineDataSourceWrapper dataSource;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannelTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgePipelineChannelTest.java
similarity index 97%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannelTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgePipelineChannelTest.java
index b46792f..e9de710 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannelTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgePipelineChannelTest.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 
 import static org.junit.Assert.assertTrue;
 
-public final class AutoAcknowledgeChannelTest {
+public final class AutoAcknowledgePipelineChannelTest {
     
     private AutoAcknowledgeChannel channel;
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannelTest.java
similarity index 94%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannelTest.java
index 96965a9..0867ba9 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannelTest.java
@@ -40,7 +40,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-public final class MemoryChannelTest {
+public final class MemoryPipelineChannelTest {
     
     private static final int CHANNEL_NUMBER = 2;
     
@@ -65,7 +65,7 @@ public final class MemoryChannelTest {
     @SneakyThrows(InterruptedException.class)
     private void execute(final AckCallback ackCallback, final int recordCount, 
final Record... records) {
         CountDownLatch countDownLatch = new CountDownLatch(recordCount);
-        MemoryChannel memoryChannel = new MemoryChannel(CHANNEL_NUMBER, 10000, 
ackCallback);
+        MemoryPipelineChannel memoryChannel = new 
MemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
         fetchWithMultiThreading(memoryChannel, countDownLatch);
         for (Record record : records) {
             memoryChannel.pushRecord(record);
@@ -75,7 +75,7 @@ public final class MemoryChannelTest {
         memoryChannel.close();
     }
     
-    private void fetchWithMultiThreading(final MemoryChannel memoryChannel, 
final CountDownLatch countDownLatch) {
+    private void fetchWithMultiThreading(final MemoryPipelineChannel 
memoryChannel, final CountDownLatch countDownLatch) {
         for (int i = 0; i < CHANNEL_NUMBER; i++) {
             new Thread(() -> {
                 int maxLoopCount = 10;

Reply via email to