This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bd3c0d1 Rename Channel to PipelineChannel (#14629)
bd3c0d1 is described below
commit bd3c0d1758d15b0649543da6328915d783b54c3e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Jan 9 13:34:32 2022 +0800
Rename Channel to PipelineChannel (#14629)
---
.../data/pipeline/core/importer/AbstractImporter.java | 4 ++--
.../{MemoryChannel.java => MemoryPipelineChannel.java} | 10 +++++-----
.../pipeline/core/ingest/dumper/AbstractInventoryDumper.java | 4 ++--
.../core/spi/ingest/channel/MemoryPipelineChannelFactory.java | 8 ++++----
.../data/pipeline/core/task/IncrementalTask.java | 6 +++---
.../shardingsphere/data/pipeline/core/task/InventoryTask.java | 4 ++--
.../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java | 4 ++--
.../data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 6 +++---
.../data/pipeline/opengauss/ingest/OpenGaussWalDumper.java | 4 ++--
.../data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java | 4 ++--
.../pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java | 6 +++---
.../api/ingest/channel/{Channel.java => PipelineChannel.java} | 5 ++---
.../shardingsphere/data/pipeline/spi/importer/Importer.java | 4 ++--
.../pipeline/spi/ingest/channel/PipelineChannelFactory.java | 6 +++---
.../shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java | 4 ++--
.../data/pipeline/core/fixture/FixtureImporter.java | 4 ++--
.../data/pipeline/core/fixture/FixtureIncrementalDumper.java | 4 ++--
.../data/pipeline/core/importer/AbstractImporterTest.java | 4 ++--
...hannelTest.java => AutoAcknowledgePipelineChannelTest.java} | 2 +-
.../{MemoryChannelTest.java => MemoryPipelineChannelTest.java} | 6 +++---
20 files changed, 49 insertions(+), 50 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index f0a2d5f..e7eefb7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
@@ -61,7 +61,7 @@ public abstract class AbstractImporter extends
AbstractLifecycleExecutor impleme
private final PipelineSQLBuilder pipelineSqlBuilder;
@Setter
- private Channel channel;
+ private PipelineChannel channel;
@Setter
private ImporterListener importerListener;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannel.java
similarity index 94%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannel.java
index b704955..b845ae9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannel.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Distribution channel.
*/
@Slf4j
-public final class MemoryChannel implements Channel {
+public final class MemoryPipelineChannel implements PipelineChannel {
private final int channelNumber;
@@ -62,15 +62,15 @@ public final class MemoryChannel implements Channel {
private ScheduledExecutorService scheduleAckRecordsExecutor;
- public MemoryChannel(final AckCallback ackCallback) {
+ public MemoryPipelineChannel(final AckCallback ackCallback) {
this(10000, ackCallback);
}
- public MemoryChannel(final int blockQueueSize, final AckCallback
ackCallback) {
+ public MemoryPipelineChannel(final int blockQueueSize, final AckCallback
ackCallback) {
this(1, blockQueueSize, ackCallback);
}
- public MemoryChannel(final int channelNumber, final int blockQueueSize,
final AckCallback ackCallback) {
+ public MemoryPipelineChannel(final int channelNumber, final int
blockQueueSize, final AckCallback ackCallback) {
this.channelNumber = channelNumber;
this.ackCallback = ackCallback;
channels = new BitSetChannel[channelNumber];
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 98b39bb..7d44841 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -68,7 +68,7 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
private final TableMetaData tableMetaData;
@Setter
- private Channel channel;
+ private PipelineChannel channel;
protected AbstractInventoryDumper(final InventoryDumperConfiguration
inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager) {
if
(!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass()))
{
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
index 44d00fd..2b8aabf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel;
import com.google.common.base.Strings;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import java.util.Properties;
@@ -57,8 +57,8 @@ public final class MemoryPipelineChannelFactory implements
PipelineChannelFactor
}
@Override
- public Channel createPipelineChannel(final int outputConcurrency, final
AckCallback ackCallback) {
- return new MemoryChannel(outputConcurrency, blockQueueSize,
ackCallback);
+ public PipelineChannel createPipelineChannel(final int outputConcurrency,
final AckCallback ackCallback) {
+ return new MemoryPipelineChannel(outputConcurrency, blockQueueSize,
ackCallback);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index c1289c0..f90ee91 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
@@ -106,7 +106,7 @@ public final class IncrementalTask extends
AbstractLifecycleExecutor implements
}
private void instanceChannel(final Collection<Importer> importers) {
- Channel channel =
pipelineChannelFactory.createPipelineChannel(importers.size(), records -> {
+ PipelineChannel channel =
pipelineChannelFactory.createPipelineChannel(importers.size(), records -> {
Record lastHandledRecord = records.get(records.size() - 1);
if (!(lastHandledRecord.getPosition() instanceof
PlaceholderPosition)) {
progress.setPosition(lastHandledRecord.getPosition());
@@ -114,7 +114,7 @@ public final class IncrementalTask extends
AbstractLifecycleExecutor implements
}
});
dumper.setChannel(channel);
- // TODO merge logic into AckCallback after Channel.ack refactoring,
and then remove ImporterListener
+ // TODO merge logic into AckCallback after PipelineChannel.ack
refactoring, and then remove ImporterListener
ImporterListener importerListener = records ->
progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
for (Importer each : importers) {
each.setChannel(channel);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 5c4ecea..313c686 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -111,7 +111,7 @@ public final class InventoryTask extends
AbstractLifecycleExecutor implements Pi
}
private void instanceChannel(final Importer importer) {
- Channel channel = pipelineChannelFactory.createPipelineChannel(1,
records -> {
+ PipelineChannel channel =
pipelineChannelFactory.createPipelineChannel(1, records -> {
Optional<Record> record = records.stream().filter(each ->
!(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
record.ifPresent(value -> position = value.getPosition());
});
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index da8b6a1..3a0ac48 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
@@ -76,7 +76,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
private final Random random = new SecureRandom();
@Setter
- private Channel channel;
+ private PipelineChannel channel;
static {
ShardingSphereServiceLoader.register(ValueHandler.class);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 24a5481..b32184f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -58,13 +58,13 @@ public final class MySQLIncrementalDumperTest {
private MySQLIncrementalDumper incrementalDumper;
- private MemoryChannel channel;
+ private MemoryPipelineChannel channel;
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
- channel = new MemoryChannel(records -> {
+ channel = new MemoryPipelineChannel(records -> {
});
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new
BinlogPosition("binlog-000001", 4L));
incrementalDumper.setChannel(channel);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index eb1d197..0c1a12c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
@@ -63,7 +63,7 @@ public final class OpenGaussWalDumper extends
AbstractLifecycleExecutor implemen
private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;
@Setter
- private Channel channel;
+ private PipelineChannel channel;
public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final
IngestPosition<WalPosition> position) {
walPosition = (WalPosition) position;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index fcd6a4c..e914db5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
@@ -59,7 +59,7 @@ public final class PostgreSQLWalDumper extends
AbstractLifecycleExecutor impleme
private final WalEventConverter walEventConverter;
@Setter
- private Channel channel;
+ private PipelineChannel channel;
public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final
IngestPosition<WalPosition> position) {
walPosition = (WalPosition) position;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index f33e5b5..ecd2ddd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
@@ -64,13 +64,13 @@ public final class PostgreSQLWalDumperTest {
private StandardPipelineDataSourceConfiguration pipelineDataSourceConfig;
- private MemoryChannel channel;
+ private MemoryPipelineChannel channel;
@Before
public void setUp() {
position = new WalPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(),
position);
- channel = new MemoryChannel(records -> {
+ channel = new MemoryPipelineChannel(records -> {
});
walDumper.setChannel(channel);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
similarity index 95%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
index 51cd0f0..deec0de 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
@@ -22,10 +22,9 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import java.util.List;
/**
- * Channel.
+ * Pipeline channel.
*/
-// TODO rename to PipelineChannel
-public interface Channel {
+public interface PipelineChannel {
/**
* Push {@code DataRecord} into channel.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
index 493a500..de033cb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.spi.importer;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
/**
* Importer.
@@ -30,7 +30,7 @@ public interface Importer extends LifecycleExecutor {
*
* @param channel channel
*/
- void setChannel(Channel channel);
+ void setChannel(PipelineChannel channel);
/**
* Write data to channel.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
index 22277e8..fafef82 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
@@ -32,7 +32,7 @@ public interface PipelineChannelFactory extends
ShardingSphereAlgorithm, Shardin
*
* @param outputConcurrency output concurrency
* @param ackCallback ack callback
- * @return {@link Channel}
+ * @return {@link PipelineChannel}
*/
- Channel createPipelineChannel(int outputConcurrency, AckCallback
ackCallback);
+ PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback
ackCallback);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
index 02798e2..3958931 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
/**
* Dumper interface.
@@ -30,5 +30,5 @@ public interface Dumper extends LifecycleExecutor {
*
* @param channel channel
*/
- void setChannel(Channel channel);
+ void setChannel(PipelineChannel channel);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index ba05ac7..387dd70 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
@@ -28,7 +28,7 @@ public final class FixtureImporter implements Importer {
}
@Override
- public void setChannel(final Channel channel) {
+ public void setChannel(final PipelineChannel channel) {
}
@Override
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index f38d8d7..adc3bab 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
@@ -28,7 +28,7 @@ public final class FixtureIncrementalDumper implements
IncrementalDumper {
}
@Override
- public void setChannel(final Channel channel) {
+ public void setChannel(final PipelineChannel channel) {
}
@Override
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 5b087d9..5eb0810 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -73,7 +73,7 @@ public final class AbstractImporterTest {
private PipelineDataSourceConfiguration dataSourceConfig;
@Mock
- private Channel channel;
+ private PipelineChannel channel;
@Mock
private PipelineDataSourceWrapper dataSource;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannelTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgePipelineChannelTest.java
similarity index 97%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannelTest.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgePipelineChannelTest.java
index b46792f..e9de710 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannelTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgePipelineChannelTest.java
@@ -27,7 +27,7 @@ import java.util.Collections;
import static org.junit.Assert.assertTrue;
-public final class AutoAcknowledgeChannelTest {
+public final class AutoAcknowledgePipelineChannelTest {
private AutoAcknowledgeChannel channel;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannelTest.java
similarity index 94%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannelTest.java
index 96965a9..0867ba9 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryPipelineChannelTest.java
@@ -40,7 +40,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-public final class MemoryChannelTest {
+public final class MemoryPipelineChannelTest {
private static final int CHANNEL_NUMBER = 2;
@@ -65,7 +65,7 @@ public final class MemoryChannelTest {
@SneakyThrows(InterruptedException.class)
private void execute(final AckCallback ackCallback, final int recordCount,
final Record... records) {
CountDownLatch countDownLatch = new CountDownLatch(recordCount);
- MemoryChannel memoryChannel = new MemoryChannel(CHANNEL_NUMBER, 10000,
ackCallback);
+ MemoryPipelineChannel memoryChannel = new
MemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
fetchWithMultiThreading(memoryChannel, countDownLatch);
for (Record record : records) {
memoryChannel.pushRecord(record);
@@ -75,7 +75,7 @@ public final class MemoryChannelTest {
memoryChannel.close();
}
- private void fetchWithMultiThreading(final MemoryChannel memoryChannel,
final CountDownLatch countDownLatch) {
+ private void fetchWithMultiThreading(final MemoryPipelineChannel
memoryChannel, final CountDownLatch countDownLatch) {
for (int i = 0; i < CHANNEL_NUMBER; i++) {
new Thread(() -> {
int maxLoopCount = 10;