This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d11feb7c3 Add single table streaming case at CDC E2E (#29022)
b6d11feb7c3 is described below

commit b6d11feb7c3417132c6bea1e1b3079e4735f01a0
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Nov 13 21:07:01 2023 +0800

    Add single table streaming case at CDC E2E (#29022)
---
 .../data/pipeline/cdc/util/CDCDataNodeUtils.java           |  2 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java         | 14 +++++++-------
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
index 7231d64ecce..f4a5aee5c80 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
@@ -52,8 +52,8 @@ public final class CDCDataNodeUtils {
     public static Map<String, List<DataNode>> buildDataNodesMap(final 
ShardingSphereDatabase database, final Collection<String> tableNames) {
         Optional<ShardingRule> shardingRule = 
database.getRuleMetaData().findSingleRule(ShardingRule.class);
         Optional<SingleRule> singleRule = 
database.getRuleMetaData().findSingleRule(SingleRule.class);
-        Map<String, List<DataNode>> result = new HashMap<>();
         Optional<BroadcastRule> broadcastRule = 
database.getRuleMetaData().findSingleRule(BroadcastRule.class);
+        Map<String, List<DataNode>> result = new HashMap<>();
         // TODO support virtual data source name
         for (String each : tableNames) {
             if (singleRule.isPresent() && 
singleRule.get().getAllDataNodes().containsKey(each)) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 34f49f7ddb5..86f7bc9b658 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -65,7 +65,6 @@ import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -112,17 +111,18 @@ class CDCE2EIT {
             log.info("init data begin: {}", LocalDateTime.now());
             DataSourceExecuteUtils.execute(dataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
             DataSourceExecuteUtils.execute(dataSource, "INSERT INTO 
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, 
new Object[]{2, "b"}));
+            DataSourceExecuteUtils.execute(dataSource, "INSERT INTO 
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new 
Object[]{3}));
             log.info("init data end: {}", LocalDateTime.now());
             try (
                     Connection connection = 
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false),
                             containerComposer.getUsername(), 
containerComposer.getPassword())) {
                 initSchemaAndTable(containerComposer, connection, 0);
             }
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
-            final CDCClient cdcClient = 
buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
+            final CDCClient cdcClient = 
buildCDCClientAndStart(containerComposer);
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").isEmpty());
             String jobId = containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").get(0).get("id").toString();
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
+            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
             String tableName = dialectDatabaseMetaData.isSchemaAvailable() ? 
String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
             containerComposer.startIncrementTask(new 
E2EIncrementalTask(dataSource, tableName, new SnowflakeKeyGenerateAlgorithm(), 
containerComposer.getDatabaseType(), 20));
             containerComposer.getIncreaseTaskThread().join(10000L);
@@ -141,6 +141,7 @@ class CDCE2EIT {
                     containerComposer.getDatabaseType());
             assertDataMatched(sourceDataSource, targetDataSource, 
orderSchemaTableName);
             assertDataMatched(sourceDataSource, targetDataSource, new 
CaseInsensitiveQualifiedTable(null, "t_address"));
+            assertDataMatched(sourceDataSource, targetDataSource, new 
CaseInsensitiveQualifiedTable(null, "t_single"));
             cdcClient.close();
             Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> 
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
                     .stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
@@ -164,6 +165,7 @@ class CDCE2EIT {
         log.info("Create table sql: {}", sql);
         connection.createStatement().execute(sql);
         connection.createStatement().execute("CREATE TABLE t_address(id 
integer primary key, address_name varchar(255))");
+        connection.createStatement().execute("CREATE TABLE t_single(id integer 
primary key)");
         if (sleepSeconds > 0) {
             Awaitility.await().pollDelay(sleepSeconds, 
TimeUnit.SECONDS).until(() -> true);
         }
@@ -174,16 +176,14 @@ class CDCE2EIT {
                 containerComposer.getUsername(), 
containerComposer.getPassword()));
     }
     
-    private CDCClient buildCDCClientAndStart(final PipelineContainerComposer 
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
+    private CDCClient buildCDCClientAndStart(final PipelineContainerComposer 
containerComposer) {
         DataSource dataSource = createStandardDataSource(containerComposer, 
PipelineContainerComposer.DS_4);
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
-        String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : 
"";
         CDCClient result = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
         result.connect(recordConsumer, new 
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> 
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
         result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, 
ProxyContainerConstants.PASSWORD));
         // TODO add full=false test case later
-        result.startStreaming(new StartStreamingParameter("sharding_db", new 
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
-                SchemaTable.newBuilder().setTable("t_address").build())), 
true));
+        result.startStreaming(new StartStreamingParameter("sharding_db", 
Collections.singleton(SchemaTable.newBuilder().setTable("*").setSchema("*").build()),
 true));
         return result;
     }
     

Reply via email to