This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4f5884ae8d1 Remove deprecated MultiplexPipelineChannel (#32583)
4f5884ae8d1 is described below
commit 4f5884ae8d19bc23b9a7911a5bafd7d40ce03d88
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Aug 18 16:49:54 2024 +0800
Remove deprecated MultiplexPipelineChannel (#32583)
* Move PostgreSQLJDBCStreamQueryBuilder
* Move PostgreSQLJDBCStreamQueryBuilder
* Remove deprecated MultiplexPipelineChannel
---
.../core/channel/MultiplexPipelineChannel.java | 128 ---------------------
.../core/channel/MultiplexPipelineChannelTest.java | 112 ------------------
2 files changed, 240 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
deleted file mode 100644
index e2708ae730c..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.channel;
-
-import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/**
- * Multiplex pipeline channel.
- *
- * @deprecated It's not used for now since possible ack issue.
- */
-@Deprecated
-public final class MultiplexPipelineChannel implements PipelineChannel {
-
- private final int channelCount;
-
- private final List<PipelineChannel> channels;
-
- private final Map<String, Integer> channelAssignment = new HashMap<>();
-
- public MultiplexPipelineChannel(final int channelCount, final
PipelineChannelCreator channelCreator, final int importerBatchSize, final
PipelineChannelAckCallback ackCallback) {
- this.channelCount = channelCount;
- channels = IntStream.range(0, channelCount).mapToObj(each ->
channelCreator.newInstance(importerBatchSize,
ackCallback)).collect(Collectors.toList());
- }
-
- @Override
- public void push(final List<Record> records) {
- Record firstRecord = records.get(0);
- if (1 == records.size()) {
- push(firstRecord);
- return;
- }
- long insertDataRecordsCount =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
-> PipelineSQLOperationType.INSERT == each.getType()).count();
- if (insertDataRecordsCount == records.size()) {
- channels.get(Math.abs(firstRecord.hashCode() %
channelCount)).push(records);
- return;
- }
- for (Record each : records) {
- push(each);
- }
- }
-
- private void push(final Record ingestedRecord) {
- List<Record> records = Collections.singletonList(ingestedRecord);
- if (ingestedRecord instanceof FinishedRecord) {
- for (int i = 0; i < channelCount; i++) {
- channels.get(i).push(records);
- }
- } else if (DataRecord.class.equals(ingestedRecord.getClass())) {
- channels.get(Math.abs(ingestedRecord.hashCode() %
channelCount)).push(records);
- } else if (PlaceholderRecord.class.equals(ingestedRecord.getClass())) {
- channels.get(0).push(records);
- } else {
- throw new UnsupportedOperationException("Unsupported record type:
" + ingestedRecord.getClass().getName());
- }
- }
-
- @Override
- public List<Record> fetch(final int batchSize, final long timeoutMills) {
- return findChannel().fetch(batchSize, timeoutMills);
- }
-
- @Override
- public List<Record> peek() {
- return findChannel().peek();
- }
-
- @Override
- public List<Record> poll() {
- return findChannel().poll();
- }
-
- @Override
- public void ack(final List<Record> records) {
- findChannel().ack(records);
- }
-
- private PipelineChannel findChannel() {
- String threadId = Long.toString(Thread.currentThread().getId());
- checkAssignment(threadId);
- return channels.get(channelAssignment.get(threadId));
- }
-
- private void checkAssignment(final String threadId) {
- if (!channelAssignment.containsKey(threadId)) {
- synchronized (this) {
- if (!channelAssignment.containsKey(threadId)) {
- assignmentChannel(threadId);
- }
- }
- }
- }
-
- private void assignmentChannel(final String threadId) {
- for (int i = 0; i < channels.size(); i++) {
- if (!channelAssignment.containsValue(i)) {
- channelAssignment.put(threadId, i);
- return;
- }
- }
- }
-}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
deleted file mode 100644
index 3df5f438115..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.channel;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class MultiplexPipelineChannelTest {
-
- private static final int CHANNEL_NUMBER = 2;
-
- @Test
- void assertAckCallbackResultSortable() {
- Record[] records = mockRecords();
- execute(ackRecords -> {
- AtomicInteger lastId = new AtomicInteger();
- for (Record each : ackRecords) {
- int currentId = ((IntPosition) each.getPosition()).getId();
- assertTrue(currentId > lastId.get());
- lastId.set(currentId);
- }
- }, countDataRecord(records), records);
- }
-
- private Record[] mockRecords() {
- Record[] result = new Record[100];
- for (int i = 1; i <= result.length; i++) {
- result[i - 1] = ThreadLocalRandom.current().nextBoolean() ? new
DataRecord(PipelineSQLOperationType.INSERT, "t1", new IntPosition(i), 0) : new
PlaceholderRecord(new IntPosition(i));
- }
- return result;
- }
-
- private int countDataRecord(final Record[] records) {
- return (int) Arrays.stream(records).filter(each -> each instanceof
DataRecord).count();
- }
-
- @Test
- void assertBroadcastFinishedRecord() {
- execute(records -> assertThat(records.size(), is(1)), 2, new
FinishedRecord(new IngestPlaceholderPosition()));
- }
-
- @SneakyThrows(InterruptedException.class)
- private void execute(final PipelineChannelAckCallback ackCallback, final
int recordCount, final Record... records) {
- CountDownLatch countDownLatch = new CountDownLatch(recordCount);
- MultiplexPipelineChannel channel = new
MultiplexPipelineChannel(CHANNEL_NUMBER,
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY"), 10000,
ackCallback);
- fetchWithMultiThreads(channel, countDownLatch);
- channel.push(Arrays.asList(records));
- boolean awaitResult = countDownLatch.await(10L, TimeUnit.SECONDS);
- assertTrue(awaitResult, "await failed");
- }
-
- private void fetchWithMultiThreads(final MultiplexPipelineChannel
memoryChannel, final CountDownLatch countDownLatch) {
- for (int i = 0; i < CHANNEL_NUMBER; i++) {
- new Thread(() -> fetch(memoryChannel, countDownLatch)).start();
- }
- }
-
- private void fetch(final MultiplexPipelineChannel memoryChannel, final
CountDownLatch countDownLatch) {
- int maxLoopCount = 10;
- for (int j = 1; j <= maxLoopCount; j++) {
- List<Record> records = memoryChannel.fetch(100, 1000L);
- memoryChannel.ack(records);
- records.forEach(each -> countDownLatch.countDown());
- if (!records.isEmpty() && records.get(records.size() - 1)
instanceof FinishedRecord) {
- break;
- }
- }
- }
-
- @RequiredArgsConstructor
- @Getter
- private static final class IntPosition implements IngestPosition {
-
- private final int id;
- }
-}