This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b2607b56e73 Refactor PipelineChannel.fetch() (#29547)
b2607b56e73 is described below

commit b2607b56e73c3f5726bee04468cefff2aa7e58c9
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 26 13:55:00 2023 +0800

    Refactor PipelineChannel.fetch() (#29547)
    
    * Refactor PipelineChannel.fetch()
    
    * Refactor PipelineChannel.fetch()
---
 .../core/channel/MultiplexPipelineChannel.java     |  5 ++--
 .../pipeline/core/channel/PipelineChannel.java     |  6 ++---
 .../core/channel/memory/MemoryPipelineChannel.java |  3 +--
 .../importer/SingleChannelConsumerImporter.java    | 11 +++-----
 .../preparer/inventory/InventoryTaskSplitter.java  | 31 +++++++++++-----------
 .../channel/memory/MemoryPipelineChannelTest.java  |  7 +++--
 .../memory/MultiplexMemoryPipelineChannelTest.java |  2 +-
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  3 +--
 .../pipeline/cdc/core/importer/CDCImporter.java    |  8 +++---
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  | 12 ++++-----
 .../migration/preparer/MigrationJobPreparer.java   | 11 ++++----
 .../core/importer/PipelineDataSourceSinkTest.java  | 13 +++++----
 12 files changed, 48 insertions(+), 64 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
index a43b048de3a..346af22746a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -80,8 +79,8 @@ public final class MultiplexPipelineChannel implements 
PipelineChannel {
     }
     
     @Override
-    public List<Record> fetch(final int batchSize, final long timeout, final 
TimeUnit timeUnit) {
-        return findChannel().fetch(batchSize, timeout, timeUnit);
+    public List<Record> fetch(final int batchSize, final long timeoutMills) {
+        return findChannel().fetch(batchSize, timeoutMills);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
index 684f5e4d633..b45d5d3dab6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.channel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Pipeline channel.
@@ -40,11 +39,10 @@ public interface PipelineChannel {
      * It might be blocked at most timeout seconds if available records count 
doesn't reach batch size.
      *
      * @param batchSize record batch size
-     * @param timeout timeout
-     * @param timeUnit time unit
+     * @param timeoutMillis timeout millis
      * @return records of transactions
      */
-    List<Record> fetch(int batchSize, long timeout, TimeUnit timeUnit);
+    List<Record> fetch(int batchSize, long timeoutMillis);
     
     /**
      * Peek {@code Record} list from channel.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
index 2425a61cf78..a1d2cf2dd13 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
@@ -52,10 +52,9 @@ public final class MemoryPipelineChannel implements 
PipelineChannel {
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public List<Record> fetch(final int batchSize, final long timeout, final 
TimeUnit timeUnit) {
+    public List<Record> fetch(final int batchSize, final long timeoutMillis) {
         List<Record> result = new LinkedList<>();
         long startMillis = System.currentTimeMillis();
-        long timeoutMillis = timeUnit.toMillis(timeout);
         int recordsCount = 0;
         do {
             List<Record> records = queue.poll();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index 5d7d14d1d5b..fec3620b32e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -18,18 +18,17 @@
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -42,9 +41,7 @@ public final class SingleChannelConsumerImporter extends 
AbstractPipelineLifecyc
     
     private final int batchSize;
     
-    private final int timeout;
-    
-    private final TimeUnit timeUnit;
+    private final long timeoutMillis;
     
     private final PipelineSink sink;
     
@@ -53,7 +50,7 @@ public final class SingleChannelConsumerImporter extends 
AbstractPipelineLifecyc
     @Override
     protected void runBlocking() {
         while (isRunning()) {
-            List<Record> records = channel.fetch(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+            List<Record> records = channel.fetch(batchSize, 
timeoutMillis).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
             if (records.isEmpty()) {
                 continue;
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index fd8f292b132..464b0ec048f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -21,31 +21,31 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.Range;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
-import 
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -56,7 +56,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -86,7 +85,7 @@ public final class InventoryTaskSplitter {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
 importerConfig.getBatchSize(), position);
             Dumper dumper = new InventoryDumper(each, channel, 
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
-            Importer importer = new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), 
jobItemContext);
+            Importer importer = new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
             result.add(new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
         }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index cc90f6c5018..be6efd7ff45 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -42,7 +41,7 @@ class MemoryPipelineChannelTest {
         List<Record> records = Collections.singletonList(new 
PlaceholderRecord(new IngestFinishedPosition()));
         Thread thread = new Thread(() -> channel.push(records));
         thread.start();
-        assertThat(channel.fetch(1, 500, TimeUnit.MILLISECONDS), is(records));
+        assertThat(channel.fetch(1, 500L), is(records));
         thread.join();
     }
     
@@ -52,11 +51,11 @@ class MemoryPipelineChannelTest {
             
         });
         long startMillis = System.currentTimeMillis();
-        channel.fetch(1, 1, TimeUnit.MILLISECONDS);
+        channel.fetch(1, 1L);
         long delta = System.currentTimeMillis() - startMillis;
         assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + 
delta);
         startMillis = System.currentTimeMillis();
-        channel.fetch(1, 500, TimeUnit.MILLISECONDS);
+        channel.fetch(1, 500L);
         delta = System.currentTimeMillis() - startMillis;
         assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : " 
+ delta);
     }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 0cebdcc4565..41c59c7c35e 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -100,7 +100,7 @@ class MultiplexMemoryPipelineChannelTest {
     private void fetch(final MultiplexPipelineChannel memoryChannel, final 
CountDownLatch countDownLatch) {
         int maxLoopCount = 10;
         for (int j = 1; j <= maxLoopCount; j++) {
-            List<Record> records = memoryChannel.fetch(100, 1, 
TimeUnit.SECONDS);
+            List<Record> records = memoryChannel.fetch(100, 1000L);
             memoryChannel.ack(records);
             records.forEach(each -> countDownLatch.countDown());
             if (!records.isEmpty() && records.get(records.size() - 1) 
instanceof FinishedRecord) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 862d7cdcc6b..053462ebd95 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -48,7 +48,6 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
-import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -134,6 +133,6 @@ class PostgreSQLWALDumperTest {
             walDumper.start();
         } catch (final IngestException ignored) {
         }
-        assertThat(channel.fetch(100, 0, TimeUnit.SECONDS).size(), is(1));
+        assertThat(channel.fetch(100, 0L).size(), is(1));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 10884286169..bc65fee2ff0 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -61,9 +61,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     
     private final int batchSize;
     
-    private final long timeout;
-    
-    private final TimeUnit timeUnit;
+    private final long timeoutMillis;
     
     private final PipelineSink sink;
     
@@ -96,7 +94,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     private void doWithoutSorting() {
         for (final CDCChannelProgressPair channelProgressPair : 
originalChannelProgressPairs) {
             PipelineChannel channel = channelProgressPair.getChannel();
-            List<Record> records = channel.fetch(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+            List<Record> records = channel.fetch(batchSize, 
timeoutMillis).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
             if (records.isEmpty()) {
                 continue;
             }
@@ -139,7 +137,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
             }
         }
         if (csnRecordsList.isEmpty()) {
-            timeUnit.sleep(timeout);
+            TimeUnit.MILLISECONDS.sleep(timeoutMillis);
             return;
         }
         // TODO Combine small transactions into a large transaction, to 
improve transformation performance.
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index d0e2295631f..4b5fd0e8301 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -25,24 +25,24 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgr
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
@@ -54,7 +54,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -125,7 +124,7 @@ public final class CDCJobPreparer {
             }
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
-                    : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, 
jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
+                    : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false, 
importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
             if (!(position.get() instanceof IngestFinishedPosition)) {
@@ -146,8 +145,7 @@ public final class CDCJobPreparer {
                 .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, 
jobItemContext.getSourceMetaDataLoader());
         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
-                : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
-                        jobItemContext.getSink(), needSorting, 
importerConfig.getRateLimitAlgorithm());
+                : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 300L, jobItemContext.getSink(), needSorting, 
importerConfig.getRateLimitAlgorithm());
         PipelineTask incrementalTask = new CDCIncrementalTask(
                 dumperContext.getCommonContext().getDataSourceName(), 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, 
importer, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 3dc564bbd52..5c2be177179 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -30,8 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
@@ -46,14 +47,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -79,7 +79,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Migration job preparer.
@@ -218,7 +217,7 @@ public final class MigrationJobPreparer {
                                                  final 
PipelineJobProgressListener jobProgressListener) {
         Collection<Importer> result = new LinkedList<>();
         for (int i = 0; i < importerConfig.getConcurrency(); i++) {
-            result.add(new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, sink, jobProgressListener));
+            result.add(new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3000L, sink, jobProgressListener));
         }
         return result;
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index ec38e1e0360..6a43461f107 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -19,13 +19,13 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.importer;
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
@@ -50,7 +50,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -83,7 +82,7 @@ class PipelineDataSourceSinkTest {
     @BeforeEach
     void setUp() throws SQLException {
         PipelineSink pipelineSink = new 
PipelineDataSourceSink(mockImporterConfiguration(), 
mockPipelineDataSourceManager());
-        importer = new SingleChannelConsumerImporter(channel, 100, 1, 
TimeUnit.SECONDS, pipelineSink, new FixtureTransmissionJobItemContext());
+        importer = new SingleChannelConsumerImporter(channel, 100, 1000L, 
pipelineSink, new FixtureTransmissionJobItemContext());
     }
     
     private ImporterConfiguration mockImporterConfiguration() {
@@ -101,7 +100,7 @@ class PipelineDataSourceSinkTest {
     void assertWriteInsertDataRecord() throws SQLException {
         DataRecord insertRecord = 
getDataRecord(PipelineSQLOperationType.INSERT);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(insertRecord));
+        when(channel.fetch(anyInt(), 
anyLong())).thenReturn(mockRecords(insertRecord));
         importer.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
@@ -113,7 +112,7 @@ class PipelineDataSourceSinkTest {
     void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = 
getDataRecord(PipelineSQLOperationType.DELETE);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(deleteRecord));
+        when(channel.fetch(anyInt(), 
anyLong())).thenReturn(mockRecords(deleteRecord));
         when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
         importer.run();
         verify(preparedStatement).setObject(1, 1);
@@ -125,7 +124,7 @@ class PipelineDataSourceSinkTest {
     void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = 
getDataRecord(PipelineSQLOperationType.UPDATE);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
+        when(channel.fetch(anyInt(), 
anyLong())).thenReturn(mockRecords(updateRecord));
         importer.run();
         verify(preparedStatement).setObject(1, 20);
         verify(preparedStatement).setObject(2, 
PipelineSQLOperationType.UPDATE);
@@ -138,7 +137,7 @@ class PipelineDataSourceSinkTest {
     void assertUpdatePrimaryKeyDataRecord() throws SQLException {
         DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetch(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
+        when(channel.fetch(anyInt(), 
anyLong())).thenReturn(mockRecords(updateRecord));
         importer.run();
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);


Reply via email to