This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b81b7c20f0 Rename PipelineChannel (#29527)
5b81b7c20f0 is described below

commit 5b81b7c20f0975e1ce6cc94e07e2b8aa45cfda98
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 16:32:53 2023 +0800

    Rename PipelineChannel (#29527)
    
    * Rename DataRecordGroupEngine
    
    * Rename PipelineChannel
---
 .../importer/SingleChannelConsumerImporter.java    |  2 +-
 .../core/ingest/channel/PipelineChannel.java       | 14 ++++++--------
 .../memory/MultiplexMemoryPipelineChannel.java     | 22 +++++++++++-----------
 .../memory/SimpleMemoryPipelineChannel.java        |  8 ++++----
 .../core/ingest/dumper/InventoryDumper.java        |  4 ++--
 .../ingest/record/group/DataRecordGroupEngine.java |  4 ++--
 .../memory/MultiplexMemoryPipelineChannelTest.java |  4 ++--
 .../memory/SimpleMemoryPipelineChannelTest.java    |  8 ++++----
 .../mysql/ingest/MySQLIncrementalDumper.java       |  2 +-
 .../opengauss/ingest/OpenGaussWALDumper.java       |  4 ++--
 .../postgresql/ingest/PostgreSQLWALDumper.java     |  4 ++--
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  2 +-
 .../pipeline/cdc/core/importer/CDCImporter.java    | 10 +++++-----
 .../core/importer/PipelineDataSourceSinkTest.java  |  8 ++++----
 14 files changed, 47 insertions(+), 49 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index a2b80318596..c093c289f16 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -53,7 +53,7 @@ public final class SingleChannelConsumerImporter extends 
AbstractPipelineLifecyc
     @Override
     protected void runBlocking() {
         while (isRunning()) {
-            List<Record> records = channel.fetchRecords(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+            List<Record> records = channel.fetch(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
             if (records.isEmpty()) {
                 continue;
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
index 0f5548ac15c..79756c976db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
@@ -32,9 +32,9 @@ public interface PipelineChannel extends Closeable {
     /**
      * Push {@code DataRecord} into channel.
      *
-     * @param dataRecords data records
+     * @param records data records
      */
-    void pushRecords(List<Record> dataRecords);
+    void push(List<Record> records);
     
     /**
      * Fetch {@code Record} list from channel.
@@ -45,21 +45,21 @@ public interface PipelineChannel extends Closeable {
      * @param timeUnit time unit
      * @return records of transactions
      */
-    List<Record> fetchRecords(int batchSize, long timeout, TimeUnit timeUnit);
+    List<Record> fetch(int batchSize, long timeout, TimeUnit timeUnit);
     
     /**
      * Peek {@code Record} list from channel.
      *
      * @return records of a transaction
      */
-    List<Record> peekRecords();
+    List<Record> peek();
     
     /**
      * Poll {@code Record} list from channel.
      *
      * @return records of a transaction
      */
-    List<Record> pollRecords();
+    List<Record> poll();
     
     /**
      * Ack the last batch.
@@ -69,8 +69,6 @@ public interface PipelineChannel extends Closeable {
     // TODO Refactor ack param
     void ack(List<Record> records);
     
-    /**
-     * Close channel.
-     */
+    @Override
     void close();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 3b25d52b9f1..723f380896b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -51,7 +51,7 @@ public final class MultiplexMemoryPipelineChannel implements 
PipelineChannel {
     }
     
     @Override
-    public void pushRecords(final List<Record> records) {
+    public void push(final List<Record> records) {
         Record firstRecord = records.get(0);
         if (1 == records.size()) {
             pushRecord(firstRecord);
@@ -59,7 +59,7 @@ public final class MultiplexMemoryPipelineChannel implements 
PipelineChannel {
         }
         long insertDataRecordsCount = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
 -> PipelineSQLOperationType.INSERT == each.getType()).count();
         if (insertDataRecordsCount == records.size()) {
-            channels.get(Math.abs(firstRecord.hashCode() % 
channelNumber)).pushRecords(records);
+            channels.get(Math.abs(firstRecord.hashCode() % 
channelNumber)).push(records);
             return;
         }
         for (Record each : records) {
@@ -71,30 +71,30 @@ public final class MultiplexMemoryPipelineChannel 
implements PipelineChannel {
         List<Record> records = Collections.singletonList(ingestedRecord);
         if (ingestedRecord instanceof FinishedRecord) {
             for (int i = 0; i < channelNumber; i++) {
-                channels.get(i).pushRecords(records);
+                channels.get(i).push(records);
             }
         } else if (DataRecord.class.equals(ingestedRecord.getClass())) {
-            channels.get(Math.abs(ingestedRecord.hashCode() % 
channelNumber)).pushRecords(records);
+            channels.get(Math.abs(ingestedRecord.hashCode() % 
channelNumber)).push(records);
         } else if (PlaceholderRecord.class.equals(ingestedRecord.getClass())) {
-            channels.get(0).pushRecords(records);
+            channels.get(0).push(records);
         } else {
             throw new UnsupportedOperationException("Unsupported record type: 
" + ingestedRecord.getClass().getName());
         }
     }
     
     @Override
-    public List<Record> fetchRecords(final int batchSize, final long timeout, 
final TimeUnit timeUnit) {
-        return findChannel().fetchRecords(batchSize, timeout, timeUnit);
+    public List<Record> fetch(final int batchSize, final long timeout, final 
TimeUnit timeUnit) {
+        return findChannel().fetch(batchSize, timeout, timeUnit);
     }
     
     @Override
-    public List<Record> peekRecords() {
-        return findChannel().peekRecords();
+    public List<Record> peek() {
+        return findChannel().peek();
     }
     
     @Override
-    public List<Record> pollRecords() {
-        return findChannel().pollRecords();
+    public List<Record> poll() {
+        return findChannel().poll();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 4c56ef2b68c..7f8187cb5ff 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -46,13 +46,13 @@ public final class SimpleMemoryPipelineChannel implements 
PipelineChannel {
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public void pushRecords(final List<Record> records) {
+    public void push(final List<Record> records) {
         queue.put(records);
     }
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public List<Record> fetchRecords(final int batchSize, final long timeout, 
final TimeUnit timeUnit) {
+    public List<Record> fetch(final int batchSize, final long timeout, final 
TimeUnit timeUnit) {
         List<Record> result = new LinkedList<>();
         long startMillis = System.currentTimeMillis();
         long timeoutMillis = timeUnit.toMillis(timeout);
@@ -73,13 +73,13 @@ public final class SimpleMemoryPipelineChannel implements 
PipelineChannel {
     }
     
     @Override
-    public List<Record> peekRecords() {
+    public List<Record> peek() {
         List<Record> result = queue.peek();
         return null == result ? Collections.emptyList() : result;
     }
     
     @Override
-    public List<Record> pollRecords() {
+    public List<Record> poll() {
         List<Record> result = queue.poll();
         return null == result ? Collections.emptyList() : result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 6366244928e..edad97db01c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -129,7 +129,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 List<Record> dataRecords = new LinkedList<>();
                 while (resultSet.next()) {
                     if (dataRecords.size() >= batchSize) {
-                        channel.pushRecords(dataRecords);
+                        channel.push(dataRecords);
                         dataRecords = new LinkedList<>();
                     }
                     dataRecords.add(loadDataRecord(resultSet, 
resultSetMetaData, tableMetaData));
@@ -143,7 +143,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                     }
                 }
                 dataRecords.add(new FinishedRecord(new 
IngestFinishedPosition()));
-                channel.pushRecords(dataRecords);
+                channel.push(dataRecords);
                 log.info("Inventory dump done, rowCount={}, dataSource={}, 
actualTable={}", rowCount, 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
             } finally {
                 runningStatement.set(null);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 152b0f680b3..c412bda39ba 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -46,7 +46,7 @@ public final class DataRecordGroupEngine {
         Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
         Collection<String> tableNames = new LinkedHashSet<>();
         Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
-        Map<String, Map<PipelineSQLOperationType, List<DataRecord>>> 
batchDataRecords = new LinkedHashMap<>();
+        Map<String, Map<PipelineSQLOperationType, Collection<DataRecord>>> 
batchDataRecords = new LinkedHashMap<>();
         for (DataRecord each : records) {
             tableNames.add(each.getTableName());
             if (duplicateKeys.getOrDefault(each.getKey(), false)) {
@@ -68,7 +68,7 @@ public final class DataRecordGroupEngine {
         return result;
     }
     
-    private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<PipelineSQLOperationType, List<DataRecord>> batchRecords, final 
List<DataRecord> nonBatchRecords) {
+    private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<PipelineSQLOperationType, Collection<DataRecord>> batchRecords, final 
Collection<DataRecord> nonBatchRecords) {
         return new GroupedDataRecord(tableName, 
batchRecords.getOrDefault(PipelineSQLOperationType.INSERT, 
Collections.emptyList()),
                 batchRecords.getOrDefault(PipelineSQLOperationType.UPDATE, 
Collections.emptyList()), 
batchRecords.getOrDefault(PipelineSQLOperationType.DELETE, 
Collections.emptyList()),
                 nonBatchRecords);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 8fb6b63b3ea..fdec9682362 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -71,7 +71,7 @@ class MultiplexMemoryPipelineChannelTest {
         CountDownLatch countDownLatch = new CountDownLatch(recordCount);
         MultiplexMemoryPipelineChannel memoryChannel = new 
MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
         fetchWithMultiThreads(memoryChannel, countDownLatch);
-        memoryChannel.pushRecords(Arrays.asList(records));
+        memoryChannel.push(Arrays.asList(records));
         boolean awaitResult = countDownLatch.await(10, TimeUnit.SECONDS);
         assertTrue(awaitResult, "await failed");
         memoryChannel.close();
@@ -86,7 +86,7 @@ class MultiplexMemoryPipelineChannelTest {
     private void fetch(final MultiplexMemoryPipelineChannel memoryChannel, 
final CountDownLatch countDownLatch) {
         int maxLoopCount = 10;
         for (int j = 1; j <= maxLoopCount; j++) {
-            List<Record> records = memoryChannel.fetchRecords(100, 1, 
TimeUnit.SECONDS);
+            List<Record> records = memoryChannel.fetch(100, 1, 
TimeUnit.SECONDS);
             memoryChannel.ack(records);
             records.forEach(each -> countDownLatch.countDown());
             if (!records.isEmpty() && records.get(records.size() - 1) 
instanceof FinishedRecord) {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
index b316cd13eb2..cd5d988ce32 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
@@ -39,9 +39,9 @@ class SimpleMemoryPipelineChannelTest {
     void assertZeroQueueSizeWorks() {
         SimpleMemoryPipelineChannel channel = new 
SimpleMemoryPipelineChannel(0, new EmptyAckCallback());
         List<Record> records = Collections.singletonList(new 
PlaceholderRecord(new IngestFinishedPosition()));
-        Thread thread = new Thread(() -> channel.pushRecords(records));
+        Thread thread = new Thread(() -> channel.push(records));
         thread.start();
-        assertThat(channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS), 
is(records));
+        assertThat(channel.fetch(1, 500, TimeUnit.MILLISECONDS), is(records));
         thread.join();
     }
     
@@ -49,11 +49,11 @@ class SimpleMemoryPipelineChannelTest {
     void assertFetchRecordsTimeoutCorrectly() {
         SimpleMemoryPipelineChannel channel = new 
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
         long startMillis = System.currentTimeMillis();
-        channel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
+        channel.fetch(1, 1, TimeUnit.MILLISECONDS);
         long delta = System.currentTimeMillis() - startMillis;
         assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + 
delta);
         startMillis = System.currentTimeMillis();
-        channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS);
+        channel.fetch(1, 500, TimeUnit.MILLISECONDS);
         delta = System.currentTimeMillis() - startMillis;
         assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : " 
+ delta);
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 135aa115437..610bff78c44 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -122,7 +122,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         if (dataRecords.isEmpty()) {
             return;
         }
-        channel.pushRecords(dataRecords);
+        channel.push(dataRecords);
     }
     
     private List<? extends Record> handleEvent(final AbstractBinlogEvent 
event) {
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 0d8624dfb03..b556d51adcf 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -156,7 +156,7 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
                 records.add(walEventConverter.convert(each));
             }
             records.add(walEventConverter.convert(event));
-            channel.pushRecords(records);
+            channel.push(records);
             rowEvents = new LinkedList<>();
         }
     }
@@ -165,7 +165,7 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
         if (event instanceof BeginTXEvent) {
             return;
         }
-        
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
+        
channel.push(Collections.singletonList(walEventConverter.convert(event)));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 148b15063e8..6ceede7a6fc 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -148,7 +148,7 @@ public final class PostgreSQLWALDumper extends 
AbstractPipelineLifecycleRunnable
                 records.add(walEventConverter.convert(each));
             }
             records.add(walEventConverter.convert(event));
-            channel.pushRecords(records);
+            channel.push(records);
         }
     }
     
@@ -156,7 +156,7 @@ public final class PostgreSQLWALDumper extends 
AbstractPipelineLifecycleRunnable
         if (event instanceof BeginTXEvent) {
             return;
         }
-        
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
+        
channel.push(Collections.singletonList(walEventConverter.convert(event)));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 4347206d22f..7bc8c1cda9d 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -133,6 +133,6 @@ class PostgreSQLWALDumperTest {
             walDumper.start();
         } catch (final IngestException ignored) {
         }
-        assertThat(channel.fetchRecords(100, 0, TimeUnit.SECONDS).size(), 
is(1));
+        assertThat(channel.fetch(100, 0, TimeUnit.SECONDS).size(), is(1));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 70fb06e6d5c..790a8b8343c 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -96,7 +96,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     private void doWithoutSorting() {
         for (final CDCChannelProgressPair channelProgressPair : 
originalChannelProgressPairs) {
             PipelineChannel channel = channelProgressPair.getChannel();
-            List<Record> records = channel.fetchRecords(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+            List<Record> records = channel.fetch(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
             if (records.isEmpty()) {
                 continue;
             }
@@ -180,7 +180,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     private void prepareWhenQueueIsEmpty(final List<CDCChannelProgressPair> 
channelProgressPairs) {
         for (CDCChannelProgressPair each : channelProgressPairs) {
             PipelineChannel channel = each.getChannel();
-            List<Record> records = channel.pollRecords();
+            List<Record> records = channel.poll();
             if (records.isEmpty()) {
                 continue;
             }
@@ -204,18 +204,18 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     private void prepareWhenQueueIsNotEmpty(final List<CDCChannelProgressPair> 
channelProgressPairs, final long oldestCSN) {
         for (CDCChannelProgressPair each : channelProgressPairs) {
             PipelineChannel channel = each.getChannel();
-            List<Record> records = channel.peekRecords();
+            List<Record> records = channel.peek();
             if (records.isEmpty()) {
                 continue;
             }
             if (0 == getDataRecordsCount(records)) {
-                records = channel.pollRecords();
+                records = channel.poll();
                 channel.ack(records);
                 continue;
             }
             long csn = findFirstDataRecord(records).getCsn();
             if (csn <= oldestCSN) {
-                records = channel.pollRecords();
+                records = channel.poll();
                 csnRecordsQueue.add(new CSNRecords(csn, each, records));
             }
         }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 75aebe0fe1f..55ef59c476a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -101,7 +101,7 @@ class PipelineDataSourceSinkTest {
     void assertWriteInsertDataRecord() throws SQLException {
         DataRecord insertRecord = 
getDataRecord(PipelineSQLOperationType.INSERT);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(insertRecord));
+        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(insertRecord));
         importer.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
@@ -113,7 +113,7 @@ class PipelineDataSourceSinkTest {
     void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = 
getDataRecord(PipelineSQLOperationType.DELETE);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(deleteRecord));
+        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(deleteRecord));
         when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
         importer.run();
         verify(preparedStatement).setObject(1, 1);
@@ -125,7 +125,7 @@ class PipelineDataSourceSinkTest {
     void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = 
getDataRecord(PipelineSQLOperationType.UPDATE);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
+        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
         importer.run();
         verify(preparedStatement).setObject(1, 20);
         verify(preparedStatement).setObject(2, 
PipelineSQLOperationType.UPDATE);
@@ -138,7 +138,7 @@ class PipelineDataSourceSinkTest {
     void assertUpdatePrimaryKeyDataRecord() throws SQLException {
         DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
+        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
         importer.run();
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);

Reply via email to