This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 72c1f654a06 Refactor pipeline channel (#30319)
72c1f654a06 is described below

commit 72c1f654a064e75d258d3d3231223cb32a95e08c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Feb 27 20:00:35 2024 +0800

    Refactor pipeline channel (#30319)
    
    * Improve MemoryPipelineChannelCreatorTest
    
    * Rename and move MultiplexMemoryPipelineChannelTest
    
    * Disable MultiplexPipelineChannel
    
    * Remove concurrency param of PipelineTaskUtils.createIncrementalChannel
---
 .../core/channel/MultiplexPipelineChannel.java     |  3 +++
 .../memory/MemoryPipelineChannelCreator.java       |  2 +-
 .../core/importer/ImporterConfiguration.java       |  1 +
 .../data/pipeline/core/task/PipelineTaskUtils.java |  8 ++------
 ...Test.java => MultiplexPipelineChannelTest.java} |  7 ++-----
 .../memory/MemoryPipelineChannelCreatorTest.java   | 23 +++++++++++++++++++---
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  | 10 ++++------
 .../migration/preparer/MigrationJobPreparer.java   | 21 +++-----------------
 8 files changed, 36 insertions(+), 39 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
index 346af22746a..e2708ae730c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
@@ -32,7 +32,10 @@ import java.util.stream.IntStream;
 
 /**
  * Multiplex pipeline channel.
+ *
+ * @deprecated It's not used for now since possible ack issue.
  */
+@Deprecated
 public final class MultiplexPipelineChannel implements PipelineChannel {
     
     private final int channelCount;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
index a034645a529..0ce1421fc6f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java
@@ -42,7 +42,7 @@ public final class MemoryPipelineChannelCreator implements 
PipelineChannelCreato
     @Override
     public PipelineChannel newInstance(final int importerBatchSize, final 
PipelineChannelAckCallback ackCallback) {
         int queueSize = this.queueSize / importerBatchSize;
-        return new MemoryPipelineChannel(0 == queueSize ? 1 : queueSize, 
ackCallback);
+        return new MemoryPipelineChannel(queueSize, ackCallback);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index 5d6a3796b1b..d2da7cf2c2f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -56,6 +56,7 @@ public final class ImporterConfiguration {
     
     private final int retryTimes;
     
+    // TODO Remove concurrency
     private final int concurrency;
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index 3e1855f9340..99481541350 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.channel.MultiplexPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
@@ -80,15 +79,12 @@ public final class PipelineTaskUtils {
     /**
      * Create pipeline channel for incremental task.
      *
-     * @param concurrency output concurrency
      * @param channelConfig pipeline channel configuration
      * @param progress incremental task progress
      * @return created pipeline channel
      */
-    public static PipelineChannel createIncrementalChannel(final int 
concurrency, final AlgorithmConfiguration channelConfig, final 
IncrementalTaskProgress progress) {
+    public static PipelineChannel createIncrementalChannel(final 
AlgorithmConfiguration channelConfig, final IncrementalTaskProgress progress) {
         PipelineChannelCreator channelCreator = 
TypedSPILoader.getService(PipelineChannelCreator.class, 
channelConfig.getType(), channelConfig.getProps());
-        return 1 == concurrency
-                ? channelCreator.newInstance(5, new 
IncrementalTaskAckCallback(progress))
-                : new MultiplexPipelineChannel(concurrency, channelCreator, 5, 
new IncrementalTaskAckCallback(progress));
+        return channelCreator.newInstance(5, new 
IncrementalTaskAckCallback(progress));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
similarity index 92%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
index 41c59c7c35e..22e1292d10e 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
@@ -15,14 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.channel.memory;
+package org.apache.shardingsphere.data.pipeline.core.channel;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.core.channel.MultiplexPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
-import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
@@ -45,7 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class MultiplexMemoryPipelineChannelTest {
+class MultiplexPipelineChannelTest {
     
     private static final int CHANNEL_NUMBER = 2;
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
index 51bad419155..21040ee8a61 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java
@@ -17,22 +17,39 @@
 
 package org.apache.shardingsphere.data.pipeline.core.channel.memory;
 
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTaskAckCallback;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.test.util.PropertiesBuilder;
 import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.internal.configuration.plugins.Plugins;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 class MemoryPipelineChannelCreatorTest {
     
     @Test
-    void assertInitWithBlockQueueSize() throws Exception {
-        PipelineChannelCreator creator = 
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY", 
PropertiesBuilder.build(new Property("block-queue-size", "200")));
-        
assertThat(Plugins.getMemberAccessor().get(MemoryPipelineChannelCreator.class.getDeclaredField("queueSize"),
 creator), is(200));
+    void assertInitWithNonZeroBlockQueueSize() throws Exception {
+        PipelineChannelCreator creator = 
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY", 
PropertiesBuilder.build(new Property("block-queue-size", "2000")));
+        
assertThat(Plugins.getMemberAccessor().get(MemoryPipelineChannelCreator.class.getDeclaredField("queueSize"),
 creator), is(2000));
+        PipelineChannel channel = creator.newInstance(1000, new 
InventoryTaskAckCallback(new AtomicReference<>()));
+        Assertions.assertInstanceOf(ArrayBlockingQueue.class, 
Plugins.getMemberAccessor().get(MemoryPipelineChannel.class.getDeclaredField("queue"),
 channel));
+    }
+    
+    @Test
+    void assertInitWithZeroBlockQueueSize() throws Exception {
+        PipelineChannelCreator creator = 
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY", 
PropertiesBuilder.build(new Property("block-queue-size", "0")));
+        
assertThat(Plugins.getMemberAccessor().get(MemoryPipelineChannelCreator.class.getDeclaredField("queueSize"),
 creator), is(0));
+        PipelineChannel channel = creator.newInstance(1000, new 
InventoryTaskAckCallback(new AtomicReference<>()));
+        Assertions.assertInstanceOf(SynchronousQueue.class, 
Plugins.getMemberAccessor().get(MemoryPipelineChannel.class.getDeclaredField("queue"),
 channel));
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index fab205426af..d3bdb426d83 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -30,10 +30,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -137,16 +137,14 @@ public final class CDCJobPreparer {
     private void initIncrementalTask(final CDCJobItemContext jobItemContext, 
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> 
channelProgressPairs) {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
-        ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
-        PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
-                importerConfig.getConcurrency(), 
jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), 
taskProgress);
+        PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
 taskProgress);
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
         Dumper dumper = 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
                 .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, 
jobItemContext.getSourceMetaDataLoader());
         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
-                : new CDCImporter(channelProgressPairs, 1, 100L, 
jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
+                : new CDCImporter(channelProgressPairs, 1, 100L, 
jobItemContext.getSink(), needSorting, 
taskConfig.getImporterConfig().getRateLimitAlgorithm());
         PipelineTask incrementalTask = new CDCIncrementalTask(
                 dumperContext.getCommonContext().getDataSourceName(), 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, 
importer, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 930fd9a9a9f..67a864b71b5 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -28,11 +28,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -43,7 +41,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
@@ -76,7 +73,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.Map.Entry;
 
 /**
@@ -200,27 +196,16 @@ public final class MigrationJobPreparer {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobItemContext.getSourceMetaDataLoader();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
-        ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
-        PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
-                importerConfig.getConcurrency(), 
jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), 
taskProgress);
+        PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
 taskProgress);
         Dumper dumper = 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
                 .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
-        Collection<Importer> importers = 
createIncrementalImporters(importerConfig, jobItemContext.getSink(), channel, 
jobItemContext);
+        Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), 
jobItemContext));
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
     }
     
-    private Collection<Importer> createIncrementalImporters(final 
ImporterConfiguration importerConfig, final PipelineSink sink, final 
PipelineChannel channel,
-                                                            final 
PipelineJobProgressListener jobProgressListener) {
-        Collection<Importer> result = new LinkedList<>();
-        for (int i = 0; i < importerConfig.getConcurrency(); i++) {
-            result.add(new SingleChannelConsumerImporter(channel, 1, 5L, sink, 
jobProgressListener));
-        }
-        return result;
-    }
-    
     /**
      * Do cleanup work.
      *

Reply via email to