This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f5884ae8d1 Remove deprecated MultiplexPipelineChannel (#32583)
4f5884ae8d1 is described below

commit 4f5884ae8d19bc23b9a7911a5bafd7d40ce03d88
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Aug 18 16:49:54 2024 +0800

    Remove deprecated MultiplexPipelineChannel (#32583)
    
    * Move PostgreSQLJDBCStreamQueryBuilder
    
    * Move PostgreSQLJDBCStreamQueryBuilder
    
    * Remove deprecated MultiplexPipelineChannel
---
 .../core/channel/MultiplexPipelineChannel.java     | 128 ---------------------
 .../core/channel/MultiplexPipelineChannelTest.java | 112 ------------------
 2 files changed, 240 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
deleted file mode 100644
index e2708ae730c..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannel.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.channel;
-
-import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/**
- * Multiplex pipeline channel.
- *
- * @deprecated It's not used for now since possible ack issue.
- */
-@Deprecated
-public final class MultiplexPipelineChannel implements PipelineChannel {
-    
-    private final int channelCount;
-    
-    private final List<PipelineChannel> channels;
-    
-    private final Map<String, Integer> channelAssignment = new HashMap<>();
-    
-    public MultiplexPipelineChannel(final int channelCount, final 
PipelineChannelCreator channelCreator, final int importerBatchSize, final 
PipelineChannelAckCallback ackCallback) {
-        this.channelCount = channelCount;
-        channels = IntStream.range(0, channelCount).mapToObj(each -> 
channelCreator.newInstance(importerBatchSize, 
ackCallback)).collect(Collectors.toList());
-    }
-    
-    @Override
-    public void push(final List<Record> records) {
-        Record firstRecord = records.get(0);
-        if (1 == records.size()) {
-            push(firstRecord);
-            return;
-        }
-        long insertDataRecordsCount = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
 -> PipelineSQLOperationType.INSERT == each.getType()).count();
-        if (insertDataRecordsCount == records.size()) {
-            channels.get(Math.abs(firstRecord.hashCode() % 
channelCount)).push(records);
-            return;
-        }
-        for (Record each : records) {
-            push(each);
-        }
-    }
-    
-    private void push(final Record ingestedRecord) {
-        List<Record> records = Collections.singletonList(ingestedRecord);
-        if (ingestedRecord instanceof FinishedRecord) {
-            for (int i = 0; i < channelCount; i++) {
-                channels.get(i).push(records);
-            }
-        } else if (DataRecord.class.equals(ingestedRecord.getClass())) {
-            channels.get(Math.abs(ingestedRecord.hashCode() % 
channelCount)).push(records);
-        } else if (PlaceholderRecord.class.equals(ingestedRecord.getClass())) {
-            channels.get(0).push(records);
-        } else {
-            throw new UnsupportedOperationException("Unsupported record type: 
" + ingestedRecord.getClass().getName());
-        }
-    }
-    
-    @Override
-    public List<Record> fetch(final int batchSize, final long timeoutMills) {
-        return findChannel().fetch(batchSize, timeoutMills);
-    }
-    
-    @Override
-    public List<Record> peek() {
-        return findChannel().peek();
-    }
-    
-    @Override
-    public List<Record> poll() {
-        return findChannel().poll();
-    }
-    
-    @Override
-    public void ack(final List<Record> records) {
-        findChannel().ack(records);
-    }
-    
-    private PipelineChannel findChannel() {
-        String threadId = Long.toString(Thread.currentThread().getId());
-        checkAssignment(threadId);
-        return channels.get(channelAssignment.get(threadId));
-    }
-    
-    private void checkAssignment(final String threadId) {
-        if (!channelAssignment.containsKey(threadId)) {
-            synchronized (this) {
-                if (!channelAssignment.containsKey(threadId)) {
-                    assignmentChannel(threadId);
-                }
-            }
-        }
-    }
-    
-    private void assignmentChannel(final String threadId) {
-        for (int i = 0; i < channels.size(); i++) {
-            if (!channelAssignment.containsValue(i)) {
-                channelAssignment.put(threadId, i);
-                return;
-            }
-        }
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
deleted file mode 100644
index 3df5f438115..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/MultiplexPipelineChannelTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.channel;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class MultiplexPipelineChannelTest {
-    
-    private static final int CHANNEL_NUMBER = 2;
-    
-    @Test
-    void assertAckCallbackResultSortable() {
-        Record[] records = mockRecords();
-        execute(ackRecords -> {
-            AtomicInteger lastId = new AtomicInteger();
-            for (Record each : ackRecords) {
-                int currentId = ((IntPosition) each.getPosition()).getId();
-                assertTrue(currentId > lastId.get());
-                lastId.set(currentId);
-            }
-        }, countDataRecord(records), records);
-    }
-    
-    private Record[] mockRecords() {
-        Record[] result = new Record[100];
-        for (int i = 1; i <= result.length; i++) {
-            result[i - 1] = ThreadLocalRandom.current().nextBoolean() ? new 
DataRecord(PipelineSQLOperationType.INSERT, "t1", new IntPosition(i), 0) : new 
PlaceholderRecord(new IntPosition(i));
-        }
-        return result;
-    }
-    
-    private int countDataRecord(final Record[] records) {
-        return (int) Arrays.stream(records).filter(each -> each instanceof 
DataRecord).count();
-    }
-    
-    @Test
-    void assertBroadcastFinishedRecord() {
-        execute(records -> assertThat(records.size(), is(1)), 2, new 
FinishedRecord(new IngestPlaceholderPosition()));
-    }
-    
-    @SneakyThrows(InterruptedException.class)
-    private void execute(final PipelineChannelAckCallback ackCallback, final 
int recordCount, final Record... records) {
-        CountDownLatch countDownLatch = new CountDownLatch(recordCount);
-        MultiplexPipelineChannel channel = new 
MultiplexPipelineChannel(CHANNEL_NUMBER, 
TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY"), 10000, 
ackCallback);
-        fetchWithMultiThreads(channel, countDownLatch);
-        channel.push(Arrays.asList(records));
-        boolean awaitResult = countDownLatch.await(10L, TimeUnit.SECONDS);
-        assertTrue(awaitResult, "await failed");
-    }
-    
-    private void fetchWithMultiThreads(final MultiplexPipelineChannel 
memoryChannel, final CountDownLatch countDownLatch) {
-        for (int i = 0; i < CHANNEL_NUMBER; i++) {
-            new Thread(() -> fetch(memoryChannel, countDownLatch)).start();
-        }
-    }
-    
-    private void fetch(final MultiplexPipelineChannel memoryChannel, final 
CountDownLatch countDownLatch) {
-        int maxLoopCount = 10;
-        for (int j = 1; j <= maxLoopCount; j++) {
-            List<Record> records = memoryChannel.fetch(100, 1000L);
-            memoryChannel.ack(records);
-            records.forEach(each -> countDownLatch.countDown());
-            if (!records.isEmpty() && records.get(records.size() - 1) 
instanceof FinishedRecord) {
-                break;
-            }
-        }
-    }
-    
-    @RequiredArgsConstructor
-    @Getter
-    private static final class IntPosition implements IngestPosition {
-        
-        private final int id;
-    }
-}

Reply via email to