This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 539a763c87a Refactor MultiplexMemoryPipelineChannel's channels from 
array to list (#23982)
539a763c87a is described below

commit 539a763c87a60e77723f6b95e52025bfb711f0bc
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Feb 3 23:01:29 2023 +0800

    Refactor MultiplexMemoryPipelineChannel's channels from array to list 
(#23982)
---
 .../channel/memory/MultiplexMemoryPipelineChannel.java    | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 67d769f1700..a9e3cd49514 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -27,6 +27,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Multiplex memory pipeline channel.
@@ -35,16 +37,13 @@ public final class MultiplexMemoryPipelineChannel 
implements PipelineChannel {
     
     private final int channelNumber;
     
-    private final PipelineChannel[] channels;
+    private final List<PipelineChannel> channels;
     
     private final Map<String, Integer> channelAssignment = new HashMap<>();
     
     public MultiplexMemoryPipelineChannel(final int channelNumber, final int 
blockQueueSize, final AckCallback ackCallback) {
         this.channelNumber = channelNumber;
-        channels = new PipelineChannel[channelNumber];
-        for (int i = 0; i < channelNumber; i++) {
-            channels[i] = new SimpleMemoryPipelineChannel(blockQueueSize, 
ackCallback);
-        }
+        channels = IntStream.range(0, channelNumber).mapToObj(each -> new 
SimpleMemoryPipelineChannel(blockQueueSize, 
ackCallback)).collect(Collectors.toList());
     }
     
     @Override
@@ -63,7 +62,7 @@ public final class MultiplexMemoryPipelineChannel implements 
PipelineChannel {
     }
     
     private void pushRecord(final Record record, final int channelIndex) {
-        PipelineChannel channel = channels[channelIndex];
+        PipelineChannel channel = channels.get(channelIndex);
         channel.pushRecord(record);
     }
     
@@ -80,7 +79,7 @@ public final class MultiplexMemoryPipelineChannel implements 
PipelineChannel {
     private PipelineChannel findChannel() {
         String threadId = Long.toString(Thread.currentThread().getId());
         checkAssignment(threadId);
-        return channels[channelAssignment.get(threadId)];
+        return channels.get(channelAssignment.get(threadId));
     }
     
     private void checkAssignment(final String threadId) {
@@ -94,7 +93,7 @@ public final class MultiplexMemoryPipelineChannel implements 
PipelineChannel {
     }
     
     private void assignmentChannel(final String threadId) {
-        for (int i = 0; i < channels.length; i++) {
+        for (int i = 0; i < channels.size(); i++) {
             if (!channelAssignment.containsValue(i)) {
                 channelAssignment.put(threadId, i);
                 return;

Reply via email to