This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a113a961287 Refactor CDCE2EIT (#37871)
a113a961287 is described below

commit a113a9612876cc64780681d98619e8834afd1eb5
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 28 16:03:48 2026 +0800

    Refactor CDCE2EIT (#37871)
    
    * Clean CDCE2EIT
    
    * Extract PipelineContainerComposer.createQualifiedTable
    
    * Extract DataSourceTestUtils.createStandardDataSource
---
 .../pipeline/cases/PipelineContainerComposer.java  | 12 ++++++
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 37 +++++------------
 .../pipeline/util/DataSourceTestUtils.java         | 46 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 27 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 530286b530e..90c3b8f5b02 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.database.connector.mysql.type.MySQLDatabaseType
 import 
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
 import 
org.apache.shardingsphere.database.connector.postgresql.type.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
 import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
@@ -326,6 +327,17 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         sleepSeconds(seconds);
     }
     
+    /**
+     * Create qualified table.
+     *
+     * @param tableName table name
+     * @return qualified table
+     */
+    public QualifiedTable createQualifiedTable(final String tableName) {
+        String schemaName = new 
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable()
 ? SCHEMA_NAME : null;
+        return new QualifiedTable(schemaName, tableName);
+    }
+    
     /**
      * Create source table index list.
      *
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index c183f6807fe..36c9c191965 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.test.e2e.operation.pipeline.cases.cdc;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
@@ -36,8 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.Uniq
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
-import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -51,6 +48,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceTestUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -65,9 +63,7 @@ import java.sql.Statement;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -106,25 +102,23 @@ class CDCE2EIT {
             }
             createOrderTableRule(containerComposer);
             distSQLFacade.createBroadcastRule("t_address");
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
-            QualifiedTable qualifiedOrderTable = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
-                    ? new 
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
-                    : new QualifiedTable(null, SOURCE_TABLE_NAME);
-            initSchemaAndTable(containerComposer, 
containerComposer.getProxyDataSource(), qualifiedOrderTable, 3);
+            QualifiedTable orderQualifiedTable = 
containerComposer.createQualifiedTable(SOURCE_TABLE_NAME);
+            initSchemaAndTable(containerComposer, 
containerComposer.getProxyDataSource(), orderQualifiedTable, 3);
             PipelineDataSource jdbcDataSource = new 
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
 containerComposer.getDatabaseType());
             log.info("init data begin: {}", LocalDateTime.now());
-            IntPkLargeOrderDAO orderDAO = new 
IntPkLargeOrderDAO(jdbcDataSource, containerComposer.getDatabaseType(), 
qualifiedOrderTable);
+            IntPkLargeOrderDAO orderDAO = new 
IntPkLargeOrderDAO(jdbcDataSource, containerComposer.getDatabaseType(), 
orderQualifiedTable);
             
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO 
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, 
new Object[]{2, "b"}));
             DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO 
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new 
Object[]{3}));
             log.info("init data end: {}", LocalDateTime.now());
-            PipelineDataSource targetDataSource = 
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
-            initSchemaAndTable(containerComposer, targetDataSource, 
qualifiedOrderTable, 0);
+            PipelineDataSource targetDataSource = 
DataSourceTestUtils.createStandardDataSource(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false),
+                    containerComposer.getUsername(), 
containerComposer.getPassword());
+            initSchemaAndTable(containerComposer, targetDataSource, 
orderQualifiedTable, 0);
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
             Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
             distSQLFacade.waitJobIncrementalStageFinished(jobId);
-            String orderTableName = qualifiedOrderTable.format();
+            String orderTableName = orderQualifiedTable.format();
             new E2EIncrementalTask(jdbcDataSource, orderTableName, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
             distSQLFacade.waitJobIncrementalStageFinished(jobId);
             for (int i = 1; i <= 4; i++) {
@@ -132,9 +126,6 @@ class CDCE2EIT {
                 orderDAO.insert(orderId, i, "OK");
                 containerComposer.assertRecordExists(targetDataSource, 
orderTableName, orderId);
             }
-            QualifiedTable orderQualifiedTable = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
-                    ? new 
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
-                    : new QualifiedTable(null, SOURCE_TABLE_NAME);
             assertDataMatched(jdbcDataSource, targetDataSource, 
orderQualifiedTable);
             assertDataMatched(jdbcDataSource, targetDataSource, new 
QualifiedTable(null, "t_address"));
             assertDataMatched(jdbcDataSource, targetDataSource, new 
QualifiedTable(null, "t_single"));
@@ -151,26 +142,18 @@ class CDCE2EIT {
         Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(2L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
-    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final DataSource dataSource, final QualifiedTable 
qualifiedOrderTable, final int seconds) throws SQLException {
+    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final DataSource dataSource, final QualifiedTable 
orderQualifiedTable, final int seconds) throws SQLException {
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             containerComposer.createSchema(connection, seconds);
-            new IntPkLargeOrderDAO(dataSource, 
containerComposer.getDatabaseType(), qualifiedOrderTable).createTable();
+            new IntPkLargeOrderDAO(dataSource, 
containerComposer.getDatabaseType(), orderQualifiedTable).createTable();
             statement.execute("CREATE TABLE t_address(id integer primary key, 
address_name varchar(255))");
             statement.execute("CREATE TABLE t_single(id integer primary key)");
         }
         containerComposer.sleepSeconds(seconds);
     }
     
-    private PipelineDataSource createStandardDataSource(final 
PipelineContainerComposer containerComposer, final String storageUnitName) {
-        Map<String, Object> poolProps = new HashMap<>(3, 1F);
-        poolProps.put("url", 
containerComposer.getActualJdbcUrlTemplate(storageUnitName, false));
-        poolProps.put("username", containerComposer.getUsername());
-        poolProps.put("password", containerComposer.getPassword());
-        return new PipelineDataSource(new 
StandardPipelineDataSourceConfiguration(poolProps));
-    }
-    
     private CDCClient buildCDCClientAndStart(final PipelineDataSource 
dataSource, final PipelineContainerComposer containerComposer) {
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
         CDCClient result = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 10000));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceTestUtils.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceTestUtils.java
new file mode 100644
index 00000000000..887bbecf2c7
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceTestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class DataSourceTestUtils {
+    
+    /**
+     * Create standard data source.
+     *
+     * @param url URL
+     * @param username username
+     * @param password password
+     * @return standard data source
+     */
+    public static PipelineDataSource createStandardDataSource(final String 
url, final String username, final String password) {
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", url);
+        poolProps.put("username", username);
+        poolProps.put("password", password);
+        return new PipelineDataSource(new 
StandardPipelineDataSourceConfiguration(poolProps));
+    }
+}

Reply via email to