This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 72c1f654a06 Refactor pipeline channel (#30319)
72c1f654a06 is described below
commit 72c1f654a064e75d258d3d3231223cb32a95e08c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Feb 27 20:00:35 2024 +0800
Refactor pipeline channel (#30319)
* Improve MemoryPipelineChannelCreatorTest
* Rename and move MultiplexMemoryPipelineChannelTest
* Disable MultiplexPipelineChannel
* Remove concurrency param of PipelineTaskUtils.createIncrementalChannel
---
.../core/channel/MultiplexPipelineChannel.java | 3 +++
.../memory/MemoryPipelineChannelCreator.java | 2 +-
.../core/importer/ImporterConfiguration.java | 1 +
.../data/pipeline/core/task/PipelineTaskUtils.java | 8 ++------
...Test.java => MultiplexPipelineChannelTest.java} | 7 ++-----
.../memory/MemoryPipelineChannelCreatorTest.java | 23 +++++++++++++++++++---
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 10 ++++------
.../migration/preparer/MigrationJobPreparer.java | 21 +++-----------------
8 files changed, 36 insertions(+), 39 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
index 346af22746a..e2708ae730c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
@@ -32,7 +32,10 @@ import java.util.stream.IntStream;
/**
* Multiplex pipeline channel.
+ *
+ * @deprecated It's not used for now since possible ack issue.
*/
+@Deprecated
public final class MultiplexPipelineChannel implements PipelineChannel {
private final int channelCount;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
index a034645a529..0ce1421fc6f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
@@ -42,7 +42,7 @@ public final class MemoryPipelineChannelCreator implements
PipelineChannelCreato
@Override
public PipelineChannel newInstance(final int importerBatchSize, final
PipelineChannelAckCallback ackCallback) {
int queueSize = this.queueSize / importerBatchSize;
- return new MemoryPipelineChannel(0 == queueSize ? 1 : queueSize,
ackCallback);
+ return new MemoryPipelineChannel(queueSize, ackCallback);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index 5d6a3796b1b..d2da7cf2c2f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -56,6 +56,7 @@ public final class ImporterConfiguration {
private final int retryTimes;
+ // TODO Remove concurrency
private final int concurrency;
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index 3e1855f9340..99481541350 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.channel.MultiplexPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
@@ -80,15 +79,12 @@ public final class PipelineTaskUtils {
/**
* Create pipeline channel for incremental task.
*
- * @param concurrency output concurrency
* @param channelConfig pipeline channel configuration
* @param progress incremental task progress
* @return created pipeline channel
*/
- public static PipelineChannel createIncrementalChannel(final int
concurrency, final AlgorithmConfiguration channelConfig, final
IncrementalTaskProgress progress) {
+ public static PipelineChannel createIncrementalChannel(final
AlgorithmConfiguration channelConfig, final IncrementalTaskProgress progress) {
PipelineChannelCreator channelCreator =
TypedSPILoader.getService(PipelineChannelCreator.class,
channelConfig.getType(), channelConfig.getProps());
- return 1 == concurrency
- ? channelCreator.newInstance(5, new
IncrementalTaskAckCallback(progress))
- : new MultiplexPipelineChannel(concurrency, channelCreator, 5,
new IncrementalTaskAckCallback(progress));
+ return channelCreator.newInstance(5, new
IncrementalTaskAckCallback(progress));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
index 41c59c7c35e..22e1292d10e 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
@@ -15,14 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.channel.memory;
+package org.apache.shardingsphere.data.pipeline.core.channel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
-import
org.apache.shardingsphere.data.pipeline.core.channel.MultiplexPipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
-import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
@@ -45,7 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class MultiplexMemoryPipelineChannelTest {
+class MultiplexPipelineChannelTest {
private static final int CHANNEL_NUMBER = 2;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
index 51bad419155..21040ee8a61 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
@@ -17,22 +17,39 @@
package org.apache.shardingsphere.data.pipeline.core.channel.memory;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
+import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTaskAckCallback;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
class MemoryPipelineChannelCreatorTest {
@Test
- void assertInitWithBlockQueueSize() throws Exception {
- PipelineChannelCreator creator =
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY",
PropertiesBuilder.build(new Property("block-queue-size", "200")));
-
assertThat(Plugins.getMemberAccessor().get(MemoryPipelineChannelCreator.class.getDeclaredField("queueSize"),
creator), is(200));
+ void assertInitWithNonZeroBlockQueueSize() throws Exception {
+ PipelineChannelCreator creator =
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY",
PropertiesBuilder.build(new Property("block-queue-size", "2000")));
+
assertThat(Plugins.getMemberAccessor().get(MemoryPipelineChannelCreator.class.getDeclaredField("queueSize"),
creator), is(2000));
+ PipelineChannel channel = creator.newInstance(1000, new
InventoryTaskAckCallback(new AtomicReference<>()));
+ Assertions.assertInstanceOf(ArrayBlockingQueue.class,
Plugins.getMemberAccessor().get(MemoryPipelineChannel.class.getDeclaredField("queue"),
channel));
+ }
+
+ @Test
+ void assertInitWithZeroBlockQueueSize() throws Exception {
+ PipelineChannelCreator creator =
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY",
PropertiesBuilder.build(new Property("block-queue-size", "0")));
+
assertThat(Plugins.getMemberAccessor().get(MemoryPipelineChannelCreator.class.getDeclaredField("queueSize"),
creator), is(0));
+ PipelineChannel channel = creator.newInstance(1000, new
InventoryTaskAckCallback(new AtomicReference<>()));
+ Assertions.assertInstanceOf(SynchronousQueue.class,
Plugins.getMemberAccessor().get(MemoryPipelineChannel.class.getDeclaredField("queue"),
channel));
}
@Test
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index fab205426af..d3bdb426d83 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -30,10 +30,10 @@ import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -137,16 +137,14 @@ public final class CDCJobPreparer {
private void initIncrementalTask(final CDCJobItemContext jobItemContext,
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair>
channelProgressPairs) {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
- ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
- PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
- importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
taskProgress);
+ PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
Dumper dumper =
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
- : new CDCImporter(channelProgressPairs, 1, 100L,
jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
+ : new CDCImporter(channelProgressPairs, 1, 100L,
jobItemContext.getSink(), needSorting,
taskConfig.getImporterConfig().getRateLimitAlgorithm());
PipelineTask incrementalTask = new CDCIncrementalTask(
dumperContext.getCommonContext().getDataSourceName(),
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper,
importer, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 930fd9a9a9f..67a864b71b5 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -28,11 +28,9 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -43,7 +41,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
@@ -76,7 +73,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.Map.Entry;
/**
@@ -200,27 +196,16 @@ public final class MigrationJobPreparer {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
- ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
- PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
- importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
taskProgress);
+ PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
taskProgress);
Dumper dumper =
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
- Collection<Importer> importers =
createIncrementalImporters(importerConfig, jobItemContext.getSink(), channel,
jobItemContext);
+ Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(),
jobItemContext));
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}
- private Collection<Importer> createIncrementalImporters(final
ImporterConfiguration importerConfig, final PipelineSink sink, final
PipelineChannel channel,
- final
PipelineJobProgressListener jobProgressListener) {
- Collection<Importer> result = new LinkedList<>();
- for (int i = 0; i < importerConfig.getConcurrency(); i++) {
- result.add(new SingleChannelConsumerImporter(channel, 1, 5L, sink,
jobProgressListener));
- }
- return result;
- }
-
/**
* Do cleanup work.
*