This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6da7c4c3a58 Add more unit test of CDC module (#29678)
6da7c4c3a58 is described below

commit 6da7c4c3a5874aa1a51ba5a4f31c7467774710d4
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Jan 9 14:52:46 2024 +0800

    Add more unit test of CDC module (#29678)
    
    * Add more unit test of CDC core
    
    * Add more unit test
    
    * Remove E2E Covered unit test
---
 .../data/pipeline/cdc/client/CDCClientTest.java    | 32 ++++++++++
 .../client/util/ProtobufAnyValueConverterTest.java |  7 +++
 .../YamlCDCJobConfigurationSwapperTest.java        | 18 +++++-
 .../importer/sink/PipelineCDCSocketSinkTest.java   | 53 +++++++++++++++++
 .../pipeline/cdc/util/CDCDataNodeUtilsTest.java    | 69 ++++++++++++++++++++++
 5 files changed, 177 insertions(+), 2 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
new file mode 100644
index 00000000000..796726249c1
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client;
+
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class CDCClientTest {
+    
+    @Test
+    void assertInvalidParameters() {
+        CDCClientConfiguration config = new 
CDCClientConfiguration("localhost", -1, 1000);
+        assertThrows(IllegalArgumentException.class, () -> new 
CDCClient(config));
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
index ee6f19f6310..d5b460d1874 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
@@ -22,6 +22,7 @@ import com.google.protobuf.BoolValue;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Empty;
 import com.google.protobuf.FloatValue;
 import com.google.protobuf.Int32Value;
 import com.google.protobuf.Int64Value;
@@ -31,6 +32,7 @@ import com.google.protobuf.NullValue;
 import com.google.protobuf.StringValue;
 import com.google.protobuf.Struct;
 import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.UInt64Value;
 import com.google.protobuf.Value;
 import com.google.protobuf.util.JsonFormat;
 import org.junit.jupiter.api.Test;
@@ -40,6 +42,7 @@ import java.time.OffsetDateTime;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class ProtobufAnyValueConverterTest {
@@ -63,6 +66,10 @@ class ProtobufAnyValueConverterTest {
         assertTrue((Boolean) actual);
         actual = 
ProtobufAnyValueConverter.convertToObject(Any.pack(BytesValue.of(ByteString.copyFrom(new
 byte[]{1, 2, 3}))));
         assertThat(actual, is(new byte[]{1, 2, 3}));
+        actual = 
ProtobufAnyValueConverter.convertToObject(Any.pack(UInt64Value.of(101010L)));
+        assertThat(actual, is(101010L));
+        actual = 
ProtobufAnyValueConverter.convertToObject(Any.pack(Empty.getDefaultInstance()));
+        assertNull(actual);
         actual = Struct.newBuilder().putFields("str", 
Value.newBuilder().setStringValue("ABC defg").build())
                 .putFields("null", 
Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
                 .putFields("number", 
Value.newBuilder().setNumberValue(123.45D).build())
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
index fe4501eca6f..5dd4d8cdbf1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
@@ -18,12 +18,15 @@
 package org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper;
 
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -34,7 +37,7 @@ class YamlCDCJobConfigurationSwapperTest {
     @Test
     void assertSwapToObject() {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
-        yamlJobConfig.setJobId("j51017f973ac82cb1edea4f5238a258c25e89");
+        yamlJobConfig.setJobId("j0302p00007a8bf46da145dc155ba25c710b550220");
         yamlJobConfig.setDatabaseName("test_db");
         yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", 
"t_order_item"));
         yamlJobConfig.setFull(true);
@@ -43,9 +46,20 @@ class YamlCDCJobConfigurationSwapperTest {
         sinkConfig.setSinkType(CDCSinkType.SOCKET.name());
         yamlJobConfig.setSinkConfig(sinkConfig);
         CDCJobConfiguration actual = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
-        assertThat(actual.getJobId(), 
is("j51017f973ac82cb1edea4f5238a258c25e89"));
+        assertThat(actual.getJobId(), 
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
         assertThat(actual.getDatabaseName(), is("test_db"));
         assertThat(actual.getSchemaTableNames(), 
is(Arrays.asList("test.t_order", "t_order_item")));
         assertTrue(actual.isFull());
     }
+    
+    @Test
+    void assertSwapToYamlConfig() {
+        CDCJobConfiguration jobConfig = new 
CDCJobConfiguration("j0302p00007a8bf46da145dc155ba25c710b550220", "test_db", 
Arrays.asList("t_order", "t_order_item"), true, new MySQLDatabaseType(),
+                null, null, null, true, new 
SinkConfiguration(CDCSinkType.SOCKET, new Properties()), 1, 1);
+        YamlCDCJobConfiguration actual = new 
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(jobConfig);
+        assertThat(actual.getJobId(), 
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
+        assertThat(actual.getDatabaseName(), is("test_db"));
+        assertThat(actual.getSchemaTableNames(), is(Arrays.asList("t_order", 
"t_order_item")));
+        assertTrue(actual.isFull());
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
new file mode 100644
index 00000000000..b52b3494ea2
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
+
+import io.netty.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class PipelineCDCSocketSinkTest {
+    
+    @Test
+    void assertWrite() throws IOException {
+        Channel mockChannel = mock(Channel.class);
+        when(mockChannel.isWritable()).thenReturn(false, true);
+        when(mockChannel.isActive()).thenReturn(true);
+        ShardingSphereDatabase mockDatabase = 
mock(ShardingSphereDatabase.class);
+        when(mockDatabase.getName()).thenReturn("test");
+        try (PipelineCDCSocketSink sink = new 
PipelineCDCSocketSink(mockChannel, mockDatabase, 
Collections.singletonList("test.t_order"))) {
+            PipelineJobProgressUpdatedParameter actual = sink.write("ack", 
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
+            assertThat(actual.getProcessedRecordsCount(), is(0));
+            actual = sink.write("ack", Collections.singletonList(new 
DataRecord(PipelineSQLOperationType.DELETE, "t_order", new 
IngestPlaceholderPosition(), 1)));
+            assertThat(actual.getProcessedRecordsCount(), is(1));
+        }
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtilsTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtilsTest.java
new file mode 100644
index 00000000000..f034868a628
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtilsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+import org.apache.shardingsphere.single.rule.SingleRule;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CDCDataNodeUtilsTest {
+    
+    @Test
+    void assertBuildDataNodesMap() {
+        ShardingSphereDatabase mockDatabase = 
mock(ShardingSphereDatabase.class);
+        RuleMetaData mockRuleMetaData = mock(RuleMetaData.class);
+        ShardingRule mockShardingRule = mock(ShardingRule.class);
+        TableRule mockTableRule = mock(TableRule.class);
+        
when(mockTableRule.getActualDataNodes()).thenReturn(Collections.singletonList(new
 DataNode("ds_0.t_order")));
+        
when(mockShardingRule.findTableRule("t_order")).thenReturn(Optional.of(mockTableRule));
+        
when(mockShardingRule.getTableRule("t_order")).thenReturn(mockTableRule);
+        
when(mockRuleMetaData.findSingleRule(ShardingRule.class)).thenReturn(Optional.of(mockShardingRule));
+        SingleRule mockSingleRule = mock(SingleRule.class);
+        
when(mockRuleMetaData.findSingleRule(SingleRule.class)).thenReturn(Optional.of(mockSingleRule));
+        
when(mockSingleRule.getAllDataNodes()).thenReturn(Collections.singletonMap("t_order_item",
 Collections.singletonList(new DataNode("single.t_order_item"))));
+        when(mockDatabase.getRuleMetaData()).thenReturn(mockRuleMetaData);
+        BroadcastRule mockBroadcastRule = mock(BroadcastRule.class);
+        
when(mockRuleMetaData.findSingleRule(BroadcastRule.class)).thenReturn(Optional.of(mockBroadcastRule));
+        
when(mockBroadcastRule.findFirstActualTable("t_address")).thenReturn(Optional.of("broadcast.t_address"));
+        
when(mockBroadcastRule.getTableDataNodes()).thenReturn(Collections.singletonMap("t_address",
 Collections.singletonList(new DataNode("broadcast.t_address"))));
+        Map<String, List<DataNode>> actual = 
CDCDataNodeUtils.buildDataNodesMap(mockDatabase, Arrays.asList("t_order", 
"t_order_item", "t_address"));
+        assertTrue(actual.containsKey("t_order"));
+        assertTrue(actual.containsKey("t_order_item"));
+        assertTrue(actual.containsKey("t_address"));
+        assertThat(actual.get("t_order").get(0).getDataSourceName(), 
is("ds_0"));
+        assertThat(actual.get("t_order_item").get(0).getDataSourceName(), 
is("single"));
+        assertThat(actual.get("t_address").get(0).getDataSourceName(), 
is("broadcast"));
+    }
+}

Reply via email to