This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4350123e87a Remove useless constructors of
MultiplexMemoryPipelineChannel and SimpleMemoryPipelineChannel (#23979)
4350123e87a is described below
commit 4350123e87af29bec1dcf9ebaacf027315e618b4
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Feb 3 20:41:39 2023 +0800
Remove useless constructors of MultiplexMemoryPipelineChannel and
SimpleMemoryPipelineChannel (#23979)
* Rename AlgorithmDescription to SPIDescription
* Remove useless constructors of MultiplexMemoryPipelineChannel and
SimpleMemoryPipelineChannel
---
.../channel/memory/MultiplexMemoryPipelineChannel.java | 15 ---------------
.../channel/memory/SimpleMemoryPipelineChannel.java | 7 -------
.../pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 3 ++-
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 3 ++-
4 files changed, 4 insertions(+), 24 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 61d0e9de301..67d769f1700 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import java.util.HashMap;
import java.util.List;
@@ -34,26 +33,12 @@ import java.util.Map;
*/
public final class MultiplexMemoryPipelineChannel implements PipelineChannel {
- private static final EmptyAckCallback EMPTY_ACK_CALLBACK = new
EmptyAckCallback();
-
private final int channelNumber;
private final PipelineChannel[] channels;
private final Map<String, Integer> channelAssignment = new HashMap<>();
- public MultiplexMemoryPipelineChannel() {
- this(EMPTY_ACK_CALLBACK);
- }
-
- public MultiplexMemoryPipelineChannel(final AckCallback ackCallback) {
- this(10000, ackCallback);
- }
-
- public MultiplexMemoryPipelineChannel(final int blockQueueSize, final
AckCallback ackCallback) {
- this(1, blockQueueSize, ackCallback);
- }
-
public MultiplexMemoryPipelineChannel(final int channelNumber, final int
blockQueueSize, final AckCallback ackCallback) {
this.channelNumber = channelNumber;
channels = new PipelineChannel[channelNumber];
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 84fdda91c46..723e976f75d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import java.util.ArrayList;
@@ -33,16 +32,10 @@ import java.util.concurrent.BlockingQueue;
*/
public final class SimpleMemoryPipelineChannel implements PipelineChannel {
- private static final EmptyAckCallback EMPTY_ACK_CALLBACK = new
EmptyAckCallback();
-
private final BlockingQueue<Record> queue;
private final AckCallback ackCallback;
- public SimpleMemoryPipelineChannel(final int blockQueueSize) {
- this(blockQueueSize, EMPTY_ACK_CALLBACK);
- }
-
public SimpleMemoryPipelineChannel(final int blockQueueSize, final
AckCallback ackCallback) {
this.queue = new ArrayBlockingQueue<>(blockQueueSize);
this.ackCallback = ackCallback;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 0f5397b9e4d..4dc226ee4b7 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
@@ -79,7 +80,7 @@ public final class MySQLIncrementalDumperTest {
DumperConfiguration dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
dumperConfig.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0",
"root", "root"));
- channel = new MultiplexMemoryPipelineChannel();
+ channel = new MultiplexMemoryPipelineChannel(1, 10000, new
EmptyAckCallback());
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new
BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(new
PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true, true));
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 6294bf6c92e..08a049eb0d4 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -83,7 +84,7 @@ public final class PostgreSQLWALDumperTest {
@Before
public void setUp() {
position = new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
- channel = new MultiplexMemoryPipelineChannel();
+ channel = new MultiplexMemoryPipelineChannel(1, 10000, new
EmptyAckCallback());
dumperConfig = mockDumperConfiguration();
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
walDumper = new PostgreSQLWALDumper(dumperConfig, position, channel,
metaDataLoader);