This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 539a763c87a Refactor MultiplexMemoryPipelineChannel's channels from
array to list (#23982)
539a763c87a is described below
commit 539a763c87a60e77723f6b95e52025bfb711f0bc
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Feb 3 23:01:29 2023 +0800
Refactor MultiplexMemoryPipelineChannel's channels from array to list
(#23982)
---
.../channel/memory/MultiplexMemoryPipelineChannel.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 67d769f1700..a9e3cd49514 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -27,6 +27,8 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Multiplex memory pipeline channel.
@@ -35,16 +37,13 @@ public final class MultiplexMemoryPipelineChannel
implements PipelineChannel {
private final int channelNumber;
- private final PipelineChannel[] channels;
+ private final List<PipelineChannel> channels;
private final Map<String, Integer> channelAssignment = new HashMap<>();
public MultiplexMemoryPipelineChannel(final int channelNumber, final int
blockQueueSize, final AckCallback ackCallback) {
this.channelNumber = channelNumber;
- channels = new PipelineChannel[channelNumber];
- for (int i = 0; i < channelNumber; i++) {
- channels[i] = new SimpleMemoryPipelineChannel(blockQueueSize,
ackCallback);
- }
+ channels = IntStream.range(0, channelNumber).mapToObj(each -> new
SimpleMemoryPipelineChannel(blockQueueSize,
ackCallback)).collect(Collectors.toList());
}
@Override
@@ -63,7 +62,7 @@ public final class MultiplexMemoryPipelineChannel implements
PipelineChannel {
}
private void pushRecord(final Record record, final int channelIndex) {
- PipelineChannel channel = channels[channelIndex];
+ PipelineChannel channel = channels.get(channelIndex);
channel.pushRecord(record);
}
@@ -80,7 +79,7 @@ public final class MultiplexMemoryPipelineChannel implements
PipelineChannel {
private PipelineChannel findChannel() {
String threadId = Long.toString(Thread.currentThread().getId());
checkAssignment(threadId);
- return channels[channelAssignment.get(threadId)];
+ return channels.get(channelAssignment.get(threadId));
}
private void checkAssignment(final String threadId) {
@@ -94,7 +93,7 @@ public final class MultiplexMemoryPipelineChannel implements
PipelineChannel {
}
private void assignmentChannel(final String threadId) {
- for (int i = 0; i < channels.length; i++) {
+ for (int i = 0; i < channels.size(); i++) {
if (!channelAssignment.containsValue(i)) {
channelAssignment.put(threadId, i);
return;