This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b6d11feb7c3 Add single table streaming case at CDC E2E (#29022)
b6d11feb7c3 is described below
commit b6d11feb7c3417132c6bea1e1b3079e4735f01a0
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Nov 13 21:07:01 2023 +0800
Add single table streaming case at CDC E2E (#29022)
---
.../data/pipeline/cdc/util/CDCDataNodeUtils.java | 2 +-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 14 +++++++-------
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
index 7231d64ecce..f4a5aee5c80 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
@@ -52,8 +52,8 @@ public final class CDCDataNodeUtils {
public static Map<String, List<DataNode>> buildDataNodesMap(final
ShardingSphereDatabase database, final Collection<String> tableNames) {
Optional<ShardingRule> shardingRule =
database.getRuleMetaData().findSingleRule(ShardingRule.class);
Optional<SingleRule> singleRule =
database.getRuleMetaData().findSingleRule(SingleRule.class);
- Map<String, List<DataNode>> result = new HashMap<>();
Optional<BroadcastRule> broadcastRule =
database.getRuleMetaData().findSingleRule(BroadcastRule.class);
+ Map<String, List<DataNode>> result = new HashMap<>();
// TODO support virtual data source name
for (String each : tableNames) {
if (singleRule.isPresent() &&
singleRule.get().getAllDataNodes().containsKey(each)) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 34f49f7ddb5..86f7bc9b658 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -65,7 +65,6 @@ import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -112,17 +111,18 @@ class CDCE2EIT {
log.info("init data begin: {}", LocalDateTime.now());
DataSourceExecuteUtils.execute(dataSource,
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
DataSourceExecuteUtils.execute(dataSource, "INSERT INTO
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"},
new Object[]{2, "b"}));
+ DataSourceExecuteUtils.execute(dataSource, "INSERT INTO
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new
Object[]{3}));
log.info("init data end: {}", LocalDateTime.now());
try (
Connection connection =
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
containerComposer.getUsername(),
containerComposer.getPassword())) {
initSchemaAndTable(containerComposer, connection, 0);
}
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
- final CDCClient cdcClient =
buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
+ final CDCClient cdcClient =
buildCDCClientAndStart(containerComposer);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
STREAMING LIST").isEmpty());
String jobId = containerComposer.queryForListWithLog("SHOW
STREAMING LIST").get(0).get("id").toString();
containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
String tableName = dialectDatabaseMetaData.isSchemaAvailable() ?
String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
containerComposer.startIncrementTask(new
E2EIncrementalTask(dataSource, tableName, new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
containerComposer.getIncreaseTaskThread().join(10000L);
@@ -141,6 +141,7 @@ class CDCE2EIT {
containerComposer.getDatabaseType());
assertDataMatched(sourceDataSource, targetDataSource,
orderSchemaTableName);
assertDataMatched(sourceDataSource, targetDataSource, new
CaseInsensitiveQualifiedTable(null, "t_address"));
+ assertDataMatched(sourceDataSource, targetDataSource, new
CaseInsensitiveQualifiedTable(null, "t_single"));
cdcClient.close();
Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() ->
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
.stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
@@ -164,6 +165,7 @@ class CDCE2EIT {
log.info("Create table sql: {}", sql);
connection.createStatement().execute(sql);
connection.createStatement().execute("CREATE TABLE t_address(id
integer primary key, address_name varchar(255))");
+ connection.createStatement().execute("CREATE TABLE t_single(id integer
primary key)");
if (sleepSeconds > 0) {
Awaitility.await().pollDelay(sleepSeconds,
TimeUnit.SECONDS).until(() -> true);
}
@@ -174,16 +176,14 @@ class CDCE2EIT {
containerComposer.getUsername(),
containerComposer.getPassword()));
}
- private CDCClient buildCDCClientAndStart(final PipelineContainerComposer
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
+ private CDCClient buildCDCClientAndStart(final PipelineContainerComposer
containerComposer) {
DataSource dataSource = createStandardDataSource(containerComposer,
PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
- String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" :
"";
CDCClient result = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
result.connect(recordConsumer, new
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) ->
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME,
ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
- result.startStreaming(new StartStreamingParameter("sharding_db", new
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
- SchemaTable.newBuilder().setTable("t_address").build())),
true));
+ result.startStreaming(new StartStreamingParameter("sharding_db",
Collections.singleton(SchemaTable.newBuilder().setTable("*").setSchema("*").build()),
true));
return result;
}