This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b2607b56e73 Refactor PipelineChannel.fetch() (#29547)
b2607b56e73 is described below
commit b2607b56e73c3f5726bee04468cefff2aa7e58c9
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 26 13:55:00 2023 +0800
Refactor PipelineChannel.fetch() (#29547)
* Refactor PipelineChannel.fetch()
* Refactor PipelineChannel.fetch()
---
.../core/channel/MultiplexPipelineChannel.java | 5 ++--
.../pipeline/core/channel/PipelineChannel.java | 6 ++---
.../core/channel/memory/MemoryPipelineChannel.java | 3 +--
.../importer/SingleChannelConsumerImporter.java | 11 +++-----
.../preparer/inventory/InventoryTaskSplitter.java | 31 +++++++++++-----------
.../channel/memory/MemoryPipelineChannelTest.java | 7 +++--
.../memory/MultiplexMemoryPipelineChannelTest.java | 2 +-
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 3 +--
.../pipeline/cdc/core/importer/CDCImporter.java | 8 +++---
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 12 ++++-----
.../migration/preparer/MigrationJobPreparer.java | 11 ++++----
.../core/importer/PipelineDataSourceSinkTest.java | 13 +++++----
12 files changed, 48 insertions(+), 64 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
index a43b048de3a..346af22746a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -80,8 +79,8 @@ public final class MultiplexPipelineChannel implements
PipelineChannel {
}
@Override
- public List<Record> fetch(final int batchSize, final long timeout, final
TimeUnit timeUnit) {
- return findChannel().fetch(batchSize, timeout, timeUnit);
+ public List<Record> fetch(final int batchSize, final long timeoutMills) {
+ return findChannel().fetch(batchSize, timeoutMills);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
index 684f5e4d633..b45d5d3dab6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannel.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.channel;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import java.util.List;
-import java.util.concurrent.TimeUnit;
/**
* Pipeline channel.
@@ -40,11 +39,10 @@ public interface PipelineChannel {
* It might be blocked at most timeout seconds if available records count
doesn't reach batch size.
*
* @param batchSize record batch size
- * @param timeout timeout
- * @param timeUnit time unit
+ * @param timeoutMillis timeout millis
* @return records of transactions
*/
- List<Record> fetch(int batchSize, long timeout, TimeUnit timeUnit);
+ List<Record> fetch(int batchSize, long timeoutMillis);
/**
* Peek {@code Record} list from channel.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
index 2425a61cf78..a1d2cf2dd13 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java
@@ -52,10 +52,9 @@ public final class MemoryPipelineChannel implements
PipelineChannel {
@SneakyThrows(InterruptedException.class)
@Override
- public List<Record> fetch(final int batchSize, final long timeout, final
TimeUnit timeUnit) {
+ public List<Record> fetch(final int batchSize, final long timeoutMillis) {
List<Record> result = new LinkedList<>();
long startMillis = System.currentTimeMillis();
- long timeoutMillis = timeUnit.toMillis(timeout);
int recordsCount = 0;
do {
List<Record> records = queue.poll();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index 5d7d14d1d5b..fec3620b32e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -18,18 +18,17 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -42,9 +41,7 @@ public final class SingleChannelConsumerImporter extends
AbstractPipelineLifecyc
private final int batchSize;
- private final int timeout;
-
- private final TimeUnit timeUnit;
+ private final long timeoutMillis;
private final PipelineSink sink;
@@ -53,7 +50,7 @@ public final class SingleChannelConsumerImporter extends
AbstractPipelineLifecyc
@Override
protected void runBlocking() {
while (isRunning()) {
- List<Record> records = channel.fetch(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
if (records.isEmpty()) {
continue;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index fd8f292b132..464b0ec048f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -21,31 +21,31 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Range;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
-import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
-import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
-import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -56,7 +56,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -86,7 +85,7 @@ public final class InventoryTaskSplitter {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
- Importer importer = new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
jobItemContext);
+ Importer importer = new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
result.add(new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index cc90f6c5018..be6efd7ff45 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -42,7 +41,7 @@ class MemoryPipelineChannelTest {
List<Record> records = Collections.singletonList(new
PlaceholderRecord(new IngestFinishedPosition()));
Thread thread = new Thread(() -> channel.push(records));
thread.start();
- assertThat(channel.fetch(1, 500, TimeUnit.MILLISECONDS), is(records));
+ assertThat(channel.fetch(1, 500L), is(records));
thread.join();
}
@@ -52,11 +51,11 @@ class MemoryPipelineChannelTest {
});
long startMillis = System.currentTimeMillis();
- channel.fetch(1, 1, TimeUnit.MILLISECONDS);
+ channel.fetch(1, 1L);
long delta = System.currentTimeMillis() - startMillis;
assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " +
delta);
startMillis = System.currentTimeMillis();
- channel.fetch(1, 500, TimeUnit.MILLISECONDS);
+ channel.fetch(1, 500L);
delta = System.currentTimeMillis() - startMillis;
assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : "
+ delta);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 0cebdcc4565..41c59c7c35e 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -100,7 +100,7 @@ class MultiplexMemoryPipelineChannelTest {
private void fetch(final MultiplexPipelineChannel memoryChannel, final
CountDownLatch countDownLatch) {
int maxLoopCount = 10;
for (int j = 1; j <= maxLoopCount; j++) {
- List<Record> records = memoryChannel.fetch(100, 1,
TimeUnit.SECONDS);
+ List<Record> records = memoryChannel.fetch(100, 1000L);
memoryChannel.ack(records);
records.forEach(each -> countDownLatch.countDown());
if (!records.isEmpty() && records.get(records.size() - 1)
instanceof FinishedRecord) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 862d7cdcc6b..053462ebd95 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -48,7 +48,6 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
-import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -134,6 +133,6 @@ class PostgreSQLWALDumperTest {
walDumper.start();
} catch (final IngestException ignored) {
}
- assertThat(channel.fetch(100, 0, TimeUnit.SECONDS).size(), is(1));
+ assertThat(channel.fetch(100, 0L).size(), is(1));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 10884286169..bc65fee2ff0 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -61,9 +61,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private final int batchSize;
- private final long timeout;
-
- private final TimeUnit timeUnit;
+ private final long timeoutMillis;
private final PipelineSink sink;
@@ -96,7 +94,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private void doWithoutSorting() {
for (final CDCChannelProgressPair channelProgressPair :
originalChannelProgressPairs) {
PipelineChannel channel = channelProgressPair.getChannel();
- List<Record> records = channel.fetch(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
if (records.isEmpty()) {
continue;
}
@@ -139,7 +137,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
}
if (csnRecordsList.isEmpty()) {
- timeUnit.sleep(timeout);
+ TimeUnit.MILLISECONDS.sleep(timeoutMillis);
return;
}
// TODO Combine small transactions into a large transaction, to
improve transformation performance.
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index d0e2295631f..4b5fd0e8301 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -25,24 +25,24 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgr
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
@@ -54,7 +54,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -125,7 +124,7 @@ public final class CDCJobPreparer {
}
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
- : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS,
jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
+ : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false,
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
if (!(position.get() instanceof IngestFinishedPosition)) {
@@ -146,8 +145,7 @@ public final class CDCJobPreparer {
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
- : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
- jobItemContext.getSink(), needSorting,
importerConfig.getRateLimitAlgorithm());
+ : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 300L, jobItemContext.getSink(), needSorting,
importerConfig.getRateLimitAlgorithm());
PipelineTask incrementalTask = new CDCIncrementalTask(
dumperContext.getCommonContext().getDataSourceName(),
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper,
importer, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 3dc564bbd52..5c2be177179 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -30,8 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
@@ -46,14 +47,13 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DialectIncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -79,7 +79,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
/**
* Migration job preparer.
@@ -218,7 +217,7 @@ public final class MigrationJobPreparer {
final
PipelineJobProgressListener jobProgressListener) {
Collection<Importer> result = new LinkedList<>();
for (int i = 0; i < importerConfig.getConcurrency(); i++) {
- result.add(new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, sink, jobProgressListener));
+ result.add(new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3000L, sink, jobProgressListener));
}
return result;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index ec38e1e0360..6a43461f107 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -19,13 +19,13 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.importer;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
@@ -50,7 +50,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -83,7 +82,7 @@ class PipelineDataSourceSinkTest {
@BeforeEach
void setUp() throws SQLException {
PipelineSink pipelineSink = new
PipelineDataSourceSink(mockImporterConfiguration(),
mockPipelineDataSourceManager());
- importer = new SingleChannelConsumerImporter(channel, 100, 1,
TimeUnit.SECONDS, pipelineSink, new FixtureTransmissionJobItemContext());
+ importer = new SingleChannelConsumerImporter(channel, 100, 1000L,
pipelineSink, new FixtureTransmissionJobItemContext());
}
private ImporterConfiguration mockImporterConfiguration() {
@@ -101,7 +100,7 @@ class PipelineDataSourceSinkTest {
void assertWriteInsertDataRecord() throws SQLException {
DataRecord insertRecord =
getDataRecord(PipelineSQLOperationType.INSERT);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(insertRecord));
+ when(channel.fetch(anyInt(),
anyLong())).thenReturn(mockRecords(insertRecord));
importer.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
@@ -113,7 +112,7 @@ class PipelineDataSourceSinkTest {
void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord =
getDataRecord(PipelineSQLOperationType.DELETE);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(deleteRecord));
+ when(channel.fetch(anyInt(),
anyLong())).thenReturn(mockRecords(deleteRecord));
when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
importer.run();
verify(preparedStatement).setObject(1, 1);
@@ -125,7 +124,7 @@ class PipelineDataSourceSinkTest {
void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord =
getDataRecord(PipelineSQLOperationType.UPDATE);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
+ when(channel.fetch(anyInt(),
anyLong())).thenReturn(mockRecords(updateRecord));
importer.run();
verify(preparedStatement).setObject(1, 20);
verify(preparedStatement).setObject(2,
PipelineSQLOperationType.UPDATE);
@@ -138,7 +137,7 @@ class PipelineDataSourceSinkTest {
void assertUpdatePrimaryKeyDataRecord() throws SQLException {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
+ when(channel.fetch(anyInt(),
anyLong())).thenReturn(mockRecords(updateRecord));
importer.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);