This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f76cdfa2d9f Add IntPkLargeOrderDAO for pipeline E2E (#37869)
f76cdfa2d9f is described below

commit f76cdfa2d9f504f5b958d1aa975f1336abbc7a99
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 28 13:17:33 2026 +0800

    Add IntPkLargeOrderDAO for pipeline E2E (#37869)
    
    * Add IntPkLargeOrderDAO
    
    * Refactor CDCE2EIT.initSchemaAndTable
    
    * Refactor IntPkLargeOrderDAO to use qualifiedTableName
    
    * Improve IntPkLargeOrderDAO.insert: add buildPreparedSimpleInsertSQL
    
    * Update E2E to use IntPkLargeOrderDAO
    
    * Update code style
    
    * Recover e2e-operation.yml on: pull_request
---
 .github/workflows/e2e-operation.yml                |  2 +
 .../pipeline/cases/PipelineContainerComposer.java  | 10 ---
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 62 ++++++++-------
 .../general/MySQLMigrationGeneralE2EIT.java        | 14 ++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 16 ++--
 .../migration/general/RollbackMigrationE2EIT.java  |  2 +-
 .../IntPkLargeOrderDAO.java}                       | 41 +++++-----
 .../sqlbuilder/IntPkLargeOrderSQLBuilder.java      | 49 ++++++++++++
 .../sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java | 78 +++++++++++++++++++
 .../OpenGaussIntPkLargeOrderSQLBuilder.java        | 88 ++++++++++++++++++++++
 .../PostgreSQLIntPkLargeOrderSQLBuilder.java       | 63 ++++++++++++++++
 .../dao/order/small/StringPkSmallOrderDAO.java     |  9 +--
 ...rder.large.sqlbuilder.IntPkLargeOrderSQLBuilder | 20 +++++
 13 files changed, 368 insertions(+), 86 deletions(-)

diff --git a/.github/workflows/e2e-operation.yml 
b/.github/workflows/e2e-operation.yml
index ad2e6bc2398..127aebcc2bf 100644
--- a/.github/workflows/e2e-operation.yml
+++ b/.github/workflows/e2e-operation.yml
@@ -18,6 +18,8 @@
 name: E2E - Operation
 
 on:
+  pull_request:
+    branches: [ master ]
   workflow_dispatch:
 
 concurrency:
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 1a26cb74d13..530286b530e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -326,16 +326,6 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         sleepSeconds(seconds);
     }
     
-    /**
-     * Create source order table.
-     *
-     * @param sourceTableName source table name
-     * @throws SQLException SQL exception
-     */
-    public void createSourceOrderTable(final String sourceTableName) throws 
SQLException {
-        
sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder(sourceTableName));
-    }
-    
     /**
      * Create source table index list.
      *
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index ff1f744a452..c183f6807fe 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -44,13 +44,12 @@ import 
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.test.e2e.env.container.constants.ProxyContainerConstants;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.IntPkLargeOrderDAO;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
@@ -59,9 +58,10 @@ import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
@@ -106,42 +106,38 @@ class CDCE2EIT {
             }
             createOrderTableRule(containerComposer);
             distSQLFacade.createBroadcastRule("t_address");
-            try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
-                initSchemaAndTable(containerComposer, connection, 3);
-            }
-            PipelineDataSource sourceDataSource = new 
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
 containerComposer.getDatabaseType());
-            List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(
-                    containerComposer.getDatabaseType(), new 
AutoIncrementKeyGenerateAlgorithm(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
+            QualifiedTable qualifiedOrderTable = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
+                    ? new 
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
+                    : new QualifiedTable(null, SOURCE_TABLE_NAME);
+            initSchemaAndTable(containerComposer, 
containerComposer.getProxyDataSource(), qualifiedOrderTable, 3);
+            PipelineDataSource jdbcDataSource = new 
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
 containerComposer.getDatabaseType());
             log.info("init data begin: {}", LocalDateTime.now());
-            DataSourceExecuteUtils.execute(sourceDataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
orderInsertData);
-            DataSourceExecuteUtils.execute(sourceDataSource, "INSERT INTO 
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, 
new Object[]{2, "b"}));
-            DataSourceExecuteUtils.execute(sourceDataSource, "INSERT INTO 
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new 
Object[]{3}));
+            IntPkLargeOrderDAO orderDAO = new 
IntPkLargeOrderDAO(jdbcDataSource, containerComposer.getDatabaseType(), 
qualifiedOrderTable);
+            
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO 
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, 
new Object[]{2, "b"}));
+            DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO 
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new 
Object[]{3}));
             log.info("init data end: {}", LocalDateTime.now());
-            try (
-                    Connection connection = 
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false),
-                            containerComposer.getUsername(), 
containerComposer.getPassword())) {
-                initSchemaAndTable(containerComposer, connection, 0);
-            }
             PipelineDataSource targetDataSource = 
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
+            initSchemaAndTable(containerComposer, targetDataSource, 
qualifiedOrderTable, 0);
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
             Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
             distSQLFacade.waitJobIncrementalStageFinished(jobId);
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
-            String tableName = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ? 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : 
SOURCE_TABLE_NAME;
-            new E2EIncrementalTask(sourceDataSource, tableName, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
+            String orderTableName = qualifiedOrderTable.format();
+            new E2EIncrementalTask(jdbcDataSource, orderTableName, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
             distSQLFacade.waitJobIncrementalStageFinished(jobId);
             for (int i = 1; i <= 4; i++) {
                 int orderId = 10000 + i;
-                containerComposer.proxyExecuteWithLog(String.format("INSERT 
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId, 
i), 0);
-                containerComposer.assertRecordExists(targetDataSource, 
tableName, orderId);
+                orderDAO.insert(orderId, i, "OK");
+                containerComposer.assertRecordExists(targetDataSource, 
orderTableName, orderId);
             }
             QualifiedTable orderQualifiedTable = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
                     ? new 
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
                     : new QualifiedTable(null, SOURCE_TABLE_NAME);
-            assertDataMatched(sourceDataSource, targetDataSource, 
orderQualifiedTable);
-            assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_address"));
-            assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_single"));
+            assertDataMatched(jdbcDataSource, targetDataSource, 
orderQualifiedTable);
+            assertDataMatched(jdbcDataSource, targetDataSource, new 
QualifiedTable(null, "t_address"));
+            assertDataMatched(jdbcDataSource, targetDataSource, new 
QualifiedTable(null, "t_single"));
             cdcClient.close();
             Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(500L, 
TimeUnit.MILLISECONDS)
                     .until(() -> 
distSQLFacade.listJobs().stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
@@ -155,13 +151,15 @@ class CDCE2EIT {
         Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(2L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
-    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final Connection connection, final int seconds) throws 
SQLException {
-        containerComposer.createSchema(connection, seconds);
-        String sql = 
containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
-        log.info("Create table sql: {}", sql);
-        connection.createStatement().execute(sql);
-        connection.createStatement().execute("CREATE TABLE t_address(id 
integer primary key, address_name varchar(255))");
-        connection.createStatement().execute("CREATE TABLE t_single(id integer 
primary key)");
+    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final DataSource dataSource, final QualifiedTable 
qualifiedOrderTable, final int seconds) throws SQLException {
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            containerComposer.createSchema(connection, seconds);
+            new IntPkLargeOrderDAO(dataSource, 
containerComposer.getDatabaseType(), qualifiedOrderTable).createTable();
+            statement.execute("CREATE TABLE t_address(id integer primary key, 
address_name varchar(255))");
+            statement.execute("CREATE TABLE t_single(id integer primary key)");
+        }
         containerComposer.sleepSeconds(seconds);
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index cb33b7dff9c..87406f55170 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -21,18 +21,17 @@ import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.AbstractMigrationE2EIT;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.IntPkLargeOrderDAO;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.IntPkOrderItemDAO;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -66,7 +65,8 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
-            containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+            IntPkLargeOrderDAO orderDAO = new 
IntPkLargeOrderDAO(containerComposer.getSourceDataSource(), 
containerComposer.getDatabaseType(), new QualifiedTable(null, 
SOURCE_TABLE_NAME));
+            orderDAO.createTable();
             IntPkOrderItemDAO orderItemDAO = new 
IntPkOrderItemDAO(containerComposer.getSourceDataSource(), 
containerComposer.getDatabaseType());
             orderItemDAO.createTable();
             addMigrationSourceResource(containerComposer);
@@ -75,9 +75,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             createTargetOrderTableEncryptRule(containerComposer);
             createTargetOrderItemTableRule(containerComposer);
             log.info("init data begin: {}", LocalDateTime.now());
-            List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(
-                    containerComposer.getDatabaseType(), new 
AutoIncrementKeyGenerateAlgorithm(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
orderInsertData);
+            
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             
orderItemDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data end: {}", LocalDateTime.now());
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
@@ -87,7 +85,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             containerComposer.startIncrementTask(
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
-            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+            orderDAO.insert(10000L, 1, "OK");
             orderItemDAO.insert(10000L, 10000L, 1, "OK");
             distSQLFacade.pauseJob(orderJobId);
             distSQLFacade.resumeJob(orderJobId);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 49a688471d2..27836a574d5 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -20,18 +20,17 @@ package 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.AbstractMigrationE2EIT;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.IntPkLargeOrderDAO;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.IntPkOrderItemDAO;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -45,7 +44,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.LocalDateTime;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -68,7 +66,9 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
             createSourceSchema(containerComposer, 
PipelineContainerComposer.SCHEMA_NAME);
-            containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+            IntPkLargeOrderDAO orderDAO = new 
IntPkLargeOrderDAO(containerComposer.getSourceDataSource(), 
containerComposer.getDatabaseType(),
+                    new QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME));
+            orderDAO.createTable();
             IntPkOrderItemDAO orderItemDAO = new 
IntPkOrderItemDAO(containerComposer.getSourceDataSource(), 
containerComposer.getDatabaseType());
             orderItemDAO.createTable();
             
containerComposer.createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
@@ -78,9 +78,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             createTargetOrderTableRule(containerComposer);
             createTargetOrderItemTableRule(containerComposer);
             log.info("init data begin: {}", LocalDateTime.now());
-            List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(
-                    containerComposer.getDatabaseType(), new 
AutoIncrementKeyGenerateAlgorithm(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
orderInsertData);
+            
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             
orderItemDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             int replicationSlotsCount = 
getReplicationSlotsCount(containerComposer);
             log.info("init data end: {}, replication slots count: {}", 
LocalDateTime.now(), replicationSlotsCount);
@@ -92,7 +90,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName, 
new SnowflakeKeyGenerateAlgorithm(),
                     containerComposer.getDatabaseType(), 20));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
-            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
+            orderDAO.insert(10000L, 1, "OK");
             // TODO Insert new record in t_order_item
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertRecordExists(jdbcDataSource, 
qualifiedTableName, 10000);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
index d173d3547c2..f903f37db20 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
@@ -42,7 +42,7 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
-    void assertIllegalTimeTypesValueMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
+    void assertRollbackSuccess(final PipelineTestParameter testParam) throws 
Exception {
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql = "CREATE TABLE t_order (order_id BIGINT PRIMARY KEY, 
user_id INT, status VARCHAR(50))";
             containerComposer.sourceExecuteWithLog(sql);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/IntPkLargeOrderDAO.java
similarity index 58%
copy from 
test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
copy to 
test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/IntPkLargeOrderDAO.java
index 0e355f0f927..63e03975967 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/IntPkLargeOrderDAO.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.small;
+package org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.algorithm.keygen.uuid.UUIDKeyGenerateAlgorithm;
-import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.small.sqlbuilder.StringPkSmallOrderSQLBuilder;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 
 import javax.sql.DataSource;
@@ -30,24 +31,24 @@ import java.sql.SQLException;
 import java.util.List;
 
 /**
- * String PK small order DAO. Small table means the table has few columns.
+ * Int PK large order DAO. Large table means the table has many columns.
  */
 @Slf4j
-public final class StringPkSmallOrderDAO {
+public final class IntPkLargeOrderDAO {
     
     private final DataSource dataSource;
     
     private final DatabaseType databaseType;
     
-    private final StringPkSmallOrderSQLBuilder sqlBuilder;
+    private final IntPkLargeOrderSQLBuilder sqlBuilder;
     
-    private final String tableName;
+    private final String qualifiedTableName;
     
-    public StringPkSmallOrderDAO(final DataSource dataSource, final 
DatabaseType databaseType, final String tableName) {
+    public IntPkLargeOrderDAO(final DataSource dataSource, final DatabaseType 
databaseType, final QualifiedTable qualifiedTable) {
         this.dataSource = dataSource;
         this.databaseType = databaseType;
-        this.sqlBuilder = 
DatabaseTypedSPILoader.getService(StringPkSmallOrderSQLBuilder.class, 
databaseType);
-        this.tableName = tableName;
+        this.sqlBuilder = 
DatabaseTypedSPILoader.getService(IntPkLargeOrderSQLBuilder.class, 
databaseType);
+        this.qualifiedTableName = qualifiedTable.format();
     }
     
     /**
@@ -56,21 +57,21 @@ public final class StringPkSmallOrderDAO {
      * @throws SQLException SQL exception
      */
     public void createTable() throws SQLException {
-        String sql = sqlBuilder.buildCreateTableSQL(tableName);
-        log.info("Create string pk small order table SQL: {}", sql);
+        String sql = sqlBuilder.buildCreateTableSQL(qualifiedTableName);
+        log.info("Create int pk large order table SQL: {}", sql);
         DataSourceExecuteUtils.execute(dataSource, sql);
     }
     
     /**
      * Batch insert orders.
      *
-     * @param insertRows insert rows
+     * @param recordCount record count
      * @throws SQLException SQL exception
      */
-    public void batchInsert(final int insertRows) throws SQLException {
-        List<Object[]> paramsList = 
PipelineCaseHelper.generateSmallOrderInsertData(new UUIDKeyGenerateAlgorithm(), 
insertRows);
-        String sql = sqlBuilder.buildPreparedInsertSQL(tableName);
-        log.info("Batch insert string pk small order SQL: {}, params list 
size: {}", sql, paramsList.size());
+    public void batchInsert(final int recordCount) throws SQLException {
+        List<Object[]> paramsList = 
PipelineCaseHelper.generateOrderInsertData(databaseType, new 
AutoIncrementKeyGenerateAlgorithm(), recordCount);
+        String sql = sqlBuilder.buildPreparedInsertSQL(qualifiedTableName);
+        log.info("Batch insert int pk large order SQL: {}, params list size: 
{}", sql, paramsList.size());
         DataSourceExecuteUtils.execute(dataSource, sql, paramsList);
     }
     
@@ -82,10 +83,10 @@ public final class StringPkSmallOrderDAO {
      * @param status status
      * @throws SQLException SQL exception
      */
-    public void insert(final String orderId, final int userId, final String 
status) throws SQLException {
-        String sql = sqlBuilder.buildPreparedInsertSQL(tableName);
+    public void insert(final long orderId, final int userId, final String 
status) throws SQLException {
+        String sql = 
sqlBuilder.buildPreparedSimpleInsertSQL(qualifiedTableName);
         Object[] params = new Object[]{orderId, userId, status};
-        log.info("Insert string pk small order SQL: {}, params: {}", sql, 
params);
+        log.info("Insert int pk large order simple SQL: {}, params: {}", sql, 
params);
         DataSourceExecuteUtils.execute(dataSource, sql, params);
     }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/IntPkLargeOrderSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/IntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..79f3dc2549f
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/IntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPI;
+
+public interface IntPkLargeOrderSQLBuilder extends DatabaseTypedSPI {
+    
+    /**
+     * Build create table SQL.
+     *
+     * @param qualifiedTableName qualified table name
+     * @return create table SQL
+     */
+    String buildCreateTableSQL(String qualifiedTableName);
+    
+    /**
+     * Build prepared insert SQL.
+     *
+     * @param qualifiedTableName qualified table name
+     * @return prepared insert SQL
+     */
+    String buildPreparedInsertSQL(String qualifiedTableName);
+    
+    /**
+     * Build prepared simple insert SQL.
+     *
+     * @param qualifiedTableName qualified table name
+     * @return prepared simple insert SQL
+     */
+    default String buildPreparedSimpleInsertSQL(final String 
qualifiedTableName) {
+        return "INSERT INTO " + qualifiedTableName + " (order_id, user_id, 
status) VALUES (?, ?, ?)";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..3f004070ba6
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+public final class MySQLIntPkLargeOrderSQLBuilder implements 
IntPkLargeOrderSQLBuilder {
+    
+    @Override
+    public String buildCreateTableSQL(final String qualifiedTableName) {
+        return String.format("""
+                CREATE TABLE `%s` (
+                `order_id` bigint NOT NULL,
+                `user_id` int NOT NULL,
+                `status` varchar ( 255 ) NULL,
+                `t_mediumint` mediumint NULL,
+                `t_smallint` smallint NULL,
+                `t_tinyint` tinyint ( 3 ) NULL,
+                `t_unsigned_int` int UNSIGNED NULL,
+                `t_unsigned_mediumint` mediumint UNSIGNED NULL,
+                `t_unsigned_smallint` smallint UNSIGNED NULL,
+                `t_unsigned_tinyint` tinyint UNSIGNED NULL,
+                `t_float` float NULL,
+                `t_double` double NULL,
+                `t_decimal` decimal ( 10, 2 ) NULL,
+                `t_timestamp` timestamp(3) NULL,
+                `t_datetime` datetime(6) NULL,
+                `t_date` date NULL,
+                `t_time` time(1) NULL,
+                `t_year` year NULL,
+                `t_bit` bit(32) NULL,
+                `t_binary` binary(128) NULL,
+                `t_varbinary` varbinary(255) NULL,
+                `t_blob` blob NULL,
+                `t_mediumblob` mediumblob NULL,
+                `t_char` char ( 128 ) NULL,
+                `t_text` text NULL,
+                `t_mediumtext` mediumtext NULL,
+                `t_enum` enum ('1', '2', '3') NULL,
+                `t_set` set ('1', '2', '3') NULL,
+                `t_json` json NULL COMMENT 'json test',
+                PRIMARY KEY ( `order_id` ),
+                KEY `idx_user_id` (`user_id`),
+                KEY `idx_mulit` (`t_mediumint`,`t_unsigned_mediumint`)
+                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci
+                """, qualifiedTableName);
+    }
+    
+    @Override
+    public String buildPreparedInsertSQL(final String qualifiedTableName) {
+        return String.format("""
+                INSERT INTO %s
+                (order_id, user_id, status, t_mediumint, t_smallint, 
t_tinyint, t_unsigned_int, t_unsigned_mediumint,
+                t_unsigned_smallint, t_unsigned_tinyint, t_float, t_double, 
t_decimal, t_timestamp, t_datetime, t_date, t_time, t_year,
+                t_bit, t_binary, t_varbinary, t_blob, t_mediumblob, t_char, 
t_text, t_mediumtext, t_enum, t_set, t_json)
+                VALUES
+                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """, qualifiedTableName);
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "MySQL";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/OpenGaussIntPkLargeOrderSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/OpenGaussIntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..dd73e553668
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/OpenGaussIntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+public final class OpenGaussIntPkLargeOrderSQLBuilder implements 
IntPkLargeOrderSQLBuilder {
+    
+    @Override
+    public String buildCreateTableSQL(final String qualifiedTableName) {
+        return String.format("""
+                create table %s (
+                order_id bigint,
+                user_id integer,
+                status character varying(50),
+                c_int integer,
+                c_smallint smallint,
+                c_float real,
+                c_double double precision,
+                c_numeric numeric(10,2),
+                c_boolean boolean,
+                c_char character(32),
+                c_text text,
+                c_bytea bytea,
+                c_raw bytea,
+                c_date date,
+                c_time time without time zone,
+                c_smalldatetime smalldatetime,
+                c_timestamp timestamp without time zone,
+                c_timestamptz timestamp with time zone,
+                c_interval interval,
+                c_array integer[],
+                c_json json,
+                c_jsonb jsonb,
+                c_uuid uuid,
+                c_hash32 hash32,
+                c_tsvector tsvector,
+                c_tsquery tsquery,
+                c_bit bit(4),
+                c_int4range int4range,
+                c_daterange daterange,
+                c_tsrange tsrange,
+                c_reltime reltime,
+                c_abstime abstime,
+                c_point point,
+                c_lseg lseg,
+                c_box box,
+                c_circle circle,
+                c_bitvarying bit varying(32),
+                c_cidr cidr,
+                c_inet inet,
+                c_macaddr macaddr,
+                c_hll hll(14,10,12,0),
+                c_money money,
+                PRIMARY KEY ( order_id )
+                )
+                """, qualifiedTableName);
+    }
+    
+    @Override
+    public String buildPreparedInsertSQL(final String qualifiedTableName) {
+        return String.format("""
+                INSERT INTO %s (
+                order_id, user_id, status, c_int, c_smallint, c_float, 
c_double, c_numeric, c_boolean, c_char, c_text, c_bytea, c_raw, c_date, c_time,
+                c_smalldatetime, c_timestamp, c_timestamptz, c_interval, 
c_array, c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_tsquery, c_bit,
+                c_int4range, c_daterange, c_tsrange, c_reltime, c_abstime, 
c_point, c_lseg, c_box, c_circle, c_bitvarying, c_cidr, c_inet, c_macaddr, 
c_hll, c_money
+                ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """, qualifiedTableName);
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "openGauss";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/PostgreSQLIntPkLargeOrderSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/PostgreSQLIntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..0a0546e3c10
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/PostgreSQLIntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+public final class PostgreSQLIntPkLargeOrderSQLBuilder implements 
IntPkLargeOrderSQLBuilder {
+    
+    @Override
+    public String buildCreateTableSQL(final String qualifiedTableName) {
+        return String.format("""
+                CREATE TABLE %s (
+                order_id int8 NOT NULL,
+                user_id int4 NOT NULL,
+                status varchar ( 50 ) NULL,
+                t_int2 int2 NULL,
+                t_numeric numeric(10,2) NULL,
+                t_bool boolean NULL,
+                t_bytea bytea NULL,
+                t_char char(10) NULL,
+                t_varchar varchar(128) NULL,
+                t_float float4 NULL,
+                t_double float8 NULL,
+                t_json json NULL,
+                t_jsonb jsonb NULL,
+                t_text TEXT NULL,
+                t_date date NULL,
+                t_time TIME NULL,
+                t_timestamp timestamp NULL,
+                t_timestamptz timestamptz NULL,
+                PRIMARY KEY ( order_id )
+                )
+                """, qualifiedTableName);
+    }
+    
+    @Override
+    public String buildPreparedInsertSQL(final String qualifiedTableName) {
+        return String.format("""
+                INSERT INTO %s
+                (order_id, user_id, status, t_int2, t_numeric, t_bool, 
t_bytea, t_char, t_varchar,
+                t_float, t_double, t_json, t_jsonb, t_text, t_date, t_time, 
t_timestamp, t_timestamptz)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """, qualifiedTableName);
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "PostgreSQL";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
index 0e355f0f927..281c0780b2c 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
@@ -37,15 +37,12 @@ public final class StringPkSmallOrderDAO {
     
     private final DataSource dataSource;
     
-    private final DatabaseType databaseType;
-    
     private final StringPkSmallOrderSQLBuilder sqlBuilder;
     
     private final String tableName;
     
     public StringPkSmallOrderDAO(final DataSource dataSource, final 
DatabaseType databaseType, final String tableName) {
         this.dataSource = dataSource;
-        this.databaseType = databaseType;
         this.sqlBuilder = 
DatabaseTypedSPILoader.getService(StringPkSmallOrderSQLBuilder.class, 
databaseType);
         this.tableName = tableName;
     }
@@ -64,11 +61,11 @@ public final class StringPkSmallOrderDAO {
     /**
      * Batch insert orders.
      *
-     * @param insertRows insert rows
+     * @param recordCount record count
      * @throws SQLException SQL exception
      */
-    public void batchInsert(final int insertRows) throws SQLException {
-        List<Object[]> paramsList = 
PipelineCaseHelper.generateSmallOrderInsertData(new UUIDKeyGenerateAlgorithm(), 
insertRows);
+    public void batchInsert(final int recordCount) throws SQLException {
+        List<Object[]> paramsList = 
PipelineCaseHelper.generateSmallOrderInsertData(new UUIDKeyGenerateAlgorithm(), 
recordCount);
         String sql = sqlBuilder.buildPreparedInsertSQL(tableName);
         log.info("Batch insert string pk small order SQL: {}, params list 
size: {}", sql, paramsList.size());
         DataSourceExecuteUtils.execute(dataSource, sql, paramsList);
diff --git 
a/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder
 
b/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder
new file mode 100644
index 00000000000..cf9689e0fb8
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.MySQLIntPkLargeOrderSQLBuilder
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.PostgreSQLIntPkLargeOrderSQLBuilder
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.OpenGaussIntPkLargeOrderSQLBuilder

Reply via email to