This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6da7c4c3a58 Add more unit test of CDC module (#29678)
6da7c4c3a58 is described below
commit 6da7c4c3a5874aa1a51ba5a4f31c7467774710d4
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Jan 9 14:52:46 2024 +0800
Add more unit test of CDC module (#29678)
* Add more unit test of CDC core
* Add more unit test
* Remove E2E Covered unit test
---
.../data/pipeline/cdc/client/CDCClientTest.java | 32 ++++++++++
.../client/util/ProtobufAnyValueConverterTest.java | 7 +++
.../YamlCDCJobConfigurationSwapperTest.java | 18 +++++-
.../importer/sink/PipelineCDCSocketSinkTest.java | 53 +++++++++++++++++
.../pipeline/cdc/util/CDCDataNodeUtilsTest.java | 69 ++++++++++++++++++++++
5 files changed, 177 insertions(+), 2 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
new file mode 100644
index 00000000000..796726249c1
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client;
+
+import
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class CDCClientTest {
+
+ @Test
+ void assertInvalidParameters() {
+ CDCClientConfiguration config = new
CDCClientConfiguration("localhost", -1, 1000);
+ assertThrows(IllegalArgumentException.class, () -> new
CDCClient(config));
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
index ee6f19f6310..d5b460d1874 100644
---
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
+++
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
@@ -22,6 +22,7 @@ import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Empty;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
@@ -31,6 +32,7 @@ import com.google.protobuf.NullValue;
import com.google.protobuf.StringValue;
import com.google.protobuf.Struct;
import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value;
import com.google.protobuf.util.JsonFormat;
import org.junit.jupiter.api.Test;
@@ -40,6 +42,7 @@ import java.time.OffsetDateTime;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ProtobufAnyValueConverterTest {
@@ -63,6 +66,10 @@ class ProtobufAnyValueConverterTest {
assertTrue((Boolean) actual);
actual =
ProtobufAnyValueConverter.convertToObject(Any.pack(BytesValue.of(ByteString.copyFrom(new
byte[]{1, 2, 3}))));
assertThat(actual, is(new byte[]{1, 2, 3}));
+ actual =
ProtobufAnyValueConverter.convertToObject(Any.pack(UInt64Value.of(101010L)));
+ assertThat(actual, is(101010L));
+ actual =
ProtobufAnyValueConverter.convertToObject(Any.pack(Empty.getDefaultInstance()));
+ assertNull(actual);
actual = Struct.newBuilder().putFields("str",
Value.newBuilder().setStringValue("ABC defg").build())
.putFields("null",
Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
.putFields("number",
Value.newBuilder().setNumberValue(123.45D).build())
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
index fe4501eca6f..5dd4d8cdbf1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
@@ -18,12 +18,15 @@
package org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -34,7 +37,7 @@ class YamlCDCJobConfigurationSwapperTest {
@Test
void assertSwapToObject() {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
- yamlJobConfig.setJobId("j51017f973ac82cb1edea4f5238a258c25e89");
+ yamlJobConfig.setJobId("j0302p00007a8bf46da145dc155ba25c710b550220");
yamlJobConfig.setDatabaseName("test_db");
yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order",
"t_order_item"));
yamlJobConfig.setFull(true);
@@ -43,9 +46,20 @@ class YamlCDCJobConfigurationSwapperTest {
sinkConfig.setSinkType(CDCSinkType.SOCKET.name());
yamlJobConfig.setSinkConfig(sinkConfig);
CDCJobConfiguration actual = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
- assertThat(actual.getJobId(),
is("j51017f973ac82cb1edea4f5238a258c25e89"));
+ assertThat(actual.getJobId(),
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
assertThat(actual.getDatabaseName(), is("test_db"));
assertThat(actual.getSchemaTableNames(),
is(Arrays.asList("test.t_order", "t_order_item")));
assertTrue(actual.isFull());
}
+
+ @Test
+ void assertSwapToYamlConfig() {
+ CDCJobConfiguration jobConfig = new
CDCJobConfiguration("j0302p00007a8bf46da145dc155ba25c710b550220", "test_db",
Arrays.asList("t_order", "t_order_item"), true, new MySQLDatabaseType(),
+ null, null, null, true, new
SinkConfiguration(CDCSinkType.SOCKET, new Properties()), 1, 1);
+ YamlCDCJobConfiguration actual = new
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(jobConfig);
+ assertThat(actual.getJobId(),
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
+ assertThat(actual.getDatabaseName(), is("test_db"));
+ assertThat(actual.getSchemaTableNames(), is(Arrays.asList("t_order",
"t_order_item")));
+ assertTrue(actual.isFull());
+ }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
new file mode 100644
index 00000000000..b52b3494ea2
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
+
+import io.netty.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class PipelineCDCSocketSinkTest {
+
+ @Test
+ void assertWrite() throws IOException {
+ Channel mockChannel = mock(Channel.class);
+ when(mockChannel.isWritable()).thenReturn(false, true);
+ when(mockChannel.isActive()).thenReturn(true);
+ ShardingSphereDatabase mockDatabase =
mock(ShardingSphereDatabase.class);
+ when(mockDatabase.getName()).thenReturn("test");
+ try (PipelineCDCSocketSink sink = new
PipelineCDCSocketSink(mockChannel, mockDatabase,
Collections.singletonList("test.t_order"))) {
+ PipelineJobProgressUpdatedParameter actual = sink.write("ack",
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
+ assertThat(actual.getProcessedRecordsCount(), is(0));
+ actual = sink.write("ack", Collections.singletonList(new
DataRecord(PipelineSQLOperationType.DELETE, "t_order", new
IngestPlaceholderPosition(), 1)));
+ assertThat(actual.getProcessedRecordsCount(), is(1));
+ }
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtilsTest.java
new file mode 100644
index 00000000000..f034868a628
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtilsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+import org.apache.shardingsphere.single.rule.SingleRule;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CDCDataNodeUtilsTest {
+
+ @Test
+ void assertBuildDataNodesMap() {
+ ShardingSphereDatabase mockDatabase =
mock(ShardingSphereDatabase.class);
+ RuleMetaData mockRuleMetaData = mock(RuleMetaData.class);
+ ShardingRule mockShardingRule = mock(ShardingRule.class);
+ TableRule mockTableRule = mock(TableRule.class);
+
when(mockTableRule.getActualDataNodes()).thenReturn(Collections.singletonList(new
DataNode("ds_0.t_order")));
+
when(mockShardingRule.findTableRule("t_order")).thenReturn(Optional.of(mockTableRule));
+
when(mockShardingRule.getTableRule("t_order")).thenReturn(mockTableRule);
+
when(mockRuleMetaData.findSingleRule(ShardingRule.class)).thenReturn(Optional.of(mockShardingRule));
+ SingleRule mockSingleRule = mock(SingleRule.class);
+
when(mockRuleMetaData.findSingleRule(SingleRule.class)).thenReturn(Optional.of(mockSingleRule));
+
when(mockSingleRule.getAllDataNodes()).thenReturn(Collections.singletonMap("t_order_item",
Collections.singletonList(new DataNode("single.t_order_item"))));
+ when(mockDatabase.getRuleMetaData()).thenReturn(mockRuleMetaData);
+ BroadcastRule mockBroadcastRule = mock(BroadcastRule.class);
+
when(mockRuleMetaData.findSingleRule(BroadcastRule.class)).thenReturn(Optional.of(mockBroadcastRule));
+
when(mockBroadcastRule.findFirstActualTable("t_address")).thenReturn(Optional.of("broadcast.t_address"));
+
when(mockBroadcastRule.getTableDataNodes()).thenReturn(Collections.singletonMap("t_address",
Collections.singletonList(new DataNode("broadcast.t_address"))));
+ Map<String, List<DataNode>> actual =
CDCDataNodeUtils.buildDataNodesMap(mockDatabase, Arrays.asList("t_order",
"t_order_item", "t_address"));
+ assertTrue(actual.containsKey("t_order"));
+ assertTrue(actual.containsKey("t_order_item"));
+ assertTrue(actual.containsKey("t_address"));
+ assertThat(actual.get("t_order").get(0).getDataSourceName(),
is("ds_0"));
+ assertThat(actual.get("t_order_item").get(0).getDataSourceName(),
is("single"));
+ assertThat(actual.get("t_address").get(0).getDataSourceName(),
is("broadcast"));
+ }
+}