This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 55cbc7e0cb7 Add OrderItemDAO for pipeline E2E (#37859)
55cbc7e0cb7 is described below

commit 55cbc7e0cb7e10b9ac7542a9f5f9d9cb0a2b9023
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 27 15:45:11 2026 +0800

    Add OrderItemDAO for pipeline E2E (#37859)
    
    * Refactor CreateTableSQLGeneratorIT scenarioFiles usage
    
    * Add OrderItemDAO and SQL builders
    
    * Improve OrderItemDAO: generate params in batchInsert
    
    * Refactor DataSourceExecuteUtils
    
    * Improve OrderItemDAO: reuse DataSourceExecuteUtils
    
    * Refactor E2E not use PipelineCaseHelper.generateFullInsertData
    
    * Refactor E2E: replace PipelineCaseHelper.generateOrderItemInsertData to 
OrderItemDAO
    
    * Extract OrderItemDAO.insert
    
    * Rename assertOrderRecordExist to assertRecordExists
    
    * Rename OrderItem DAO related classes to add IntPk prefix
---
 .../pipeline/cases/PipelineContainerComposer.java  | 21 ++----
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  9 +--
 .../general/MySQLMigrationGeneralE2EIT.java        | 19 ++---
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 16 +++--
 .../primarykey/IndexesMigrationE2EIT.java          |  8 +--
 .../primarykey/MariaDBMigrationE2EIT.java          |  2 +-
 .../pipeline/command/ExtraSQLCommand.java          |  1 +
 .../pipeline/dao/orderitem/IntPkOrderItemDAO.java  | 83 ++++++++++++++++++++++
 .../sqlbuilder/IntPkOrderItemSQLBuilder.java       | 37 ++++++++++
 .../sqlbuilder/MySQLIntPkOrderItemSQLBuilder.java  | 44 ++++++++++++
 .../OpenGaussIntPkOrderItemSQLBuilder.java         | 44 ++++++++++++
 .../PostgreSQLIntPkOrderItemSQLBuilder.java        | 44 ++++++++++++
 .../framework/helper/PipelineCaseHelper.java       | 11 ++-
 .../pipeline/util/DataSourceExecuteUtils.java      | 33 ++++-----
 ...o.orderitem.sqlbuilder.IntPkOrderItemSQLBuilder | 20 ++++++
 15 files changed, 337 insertions(+), 55 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 634f98fd71e..1a26cb74d13 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -362,15 +362,6 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.%s.user_id IS 
'user id'", schema, sourceTableName));
     }
     
-    /**
-     * Create source order item table.
-     *
-     * @throws SQLException SQL exception
-     */
-    public void createSourceOrderItemTable() throws SQLException {
-        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
-    }
-    
     /**
      * Source execute with log.
      *
@@ -480,29 +471,29 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     }
     
     /**
-     * Assert order record exists in proxy.
+     * Assert record exists.
      *
      * @param dataSource data source
      * @param tableName table name
      * @param orderId order id
      */
-    public void assertOrderRecordExist(final DataSource dataSource, final 
String tableName, final Object orderId) {
+    public void assertRecordExists(final DataSource dataSource, final String 
tableName, final Object orderId) {
         String sql;
         if (orderId instanceof String) {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s' AND 
user_id>0", tableName, orderId);
         } else {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = %s AND 
user_id>0", tableName, orderId);
         }
-        assertOrderRecordExist(dataSource, sql);
+        assertRecordExists(dataSource, sql);
     }
     
     /**
-     * Assert proxy order record exist.
+     * Assert record exists.
      *
      * @param dataSource data source
      * @param sql SQL
      */
-    public void assertOrderRecordExist(final DataSource dataSource, final 
String sql) {
+    public void assertRecordExists(final DataSource dataSource, final String 
sql) {
         boolean recordExist = false;
         for (int i = 0; i < 5; i++) {
             List<Map<String, Object>> result = queryForListWithLog(dataSource, 
sql);
@@ -512,7 +503,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             }
             sleepSeconds(2);
         }
-        assertTrue(recordExist, "Order record does not exist");
+        assertTrue(recordExist, "Record does not exist");
     }
     
     /**
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 346f1600cd5..ff1f744a452 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.test.e2e.operation.pipeline.cases.cdc;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
@@ -51,6 +50,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
@@ -110,9 +110,10 @@ class CDCE2EIT {
                 initSchemaAndTable(containerComposer, connection, 3);
             }
             PipelineDataSource sourceDataSource = new 
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
 containerComposer.getDatabaseType());
-            Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(
+                    containerComposer.getDatabaseType(), new 
AutoIncrementKeyGenerateAlgorithm(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data begin: {}", LocalDateTime.now());
-            DataSourceExecuteUtils.execute(sourceDataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
+            DataSourceExecuteUtils.execute(sourceDataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
orderInsertData);
             DataSourceExecuteUtils.execute(sourceDataSource, "INSERT INTO 
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, 
new Object[]{2, "b"}));
             DataSourceExecuteUtils.execute(sourceDataSource, "INSERT INTO 
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new 
Object[]{3}));
             log.info("init data end: {}", LocalDateTime.now());
@@ -133,7 +134,7 @@ class CDCE2EIT {
             for (int i = 1; i <= 4; i++) {
                 int orderId = 10000 + i;
                 containerComposer.proxyExecuteWithLog(String.format("INSERT 
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId, 
i), 0);
-                containerComposer.assertOrderRecordExist(targetDataSource, 
tableName, orderId);
+                containerComposer.assertRecordExists(targetDataSource, 
tableName, orderId);
             }
             QualifiedTable orderQualifiedTable = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
                     ? new 
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 08311319194..cb33b7dff9c 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -19,18 +19,19 @@ package 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
 
 import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.AbstractMigrationE2EIT;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.IntPkOrderItemDAO;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -66,16 +67,18 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
             containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
-            containerComposer.createSourceOrderItemTable();
+            IntPkOrderItemDAO orderItemDAO = new 
IntPkOrderItemDAO(containerComposer.getSourceDataSource(), 
containerComposer.getDatabaseType());
+            orderItemDAO.createTable();
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             createTargetOrderTableRule(containerComposer);
             createTargetOrderTableEncryptRule(containerComposer);
             createTargetOrderItemTableRule(containerComposer);
-            Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data begin: {}", LocalDateTime.now());
-            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
-            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+            List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(
+                    containerComposer.getDatabaseType(), new 
AutoIncrementKeyGenerateAlgorithm(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
orderInsertData);
+            
orderItemDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data end: {}", LocalDateTime.now());
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
             startMigration(containerComposer, "t_order_item", "t_order_item");
@@ -85,12 +88,12 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
-            containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item 
(item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
+            orderItemDAO.insert(10000L, 10000L, 1, "OK");
             distSQLFacade.pauseJob(orderJobId);
             distSQLFacade.resumeJob(orderJobId);
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
-            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
-            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order_item", 10000);
+            containerComposer.assertRecordExists(jdbcDataSource, "t_order", 
10000);
+            containerComposer.assertRecordExists(jdbcDataSource, 
"t_order_item", 10000);
             assertMigrationSuccessById(distSQLFacade, orderJobId, 
"DATA_MATCH", ImmutableMap.of("chunk-size", "300", "streaming-range-type", 
"SMALL"));
             String orderItemJobId = 
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
             assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"DATA_MATCH", ImmutableMap.of("chunk-size", "300", "streaming-range-type", 
"LARGE"));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 7aa954e155a..49a688471d2 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -18,18 +18,19 @@
 package 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.general;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.AbstractMigrationE2EIT;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.IntPkOrderItemDAO;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
@@ -68,17 +69,19 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             distSQLFacade.alterPipelineRule();
             createSourceSchema(containerComposer, 
PipelineContainerComposer.SCHEMA_NAME);
             containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
-            containerComposer.createSourceOrderItemTable();
+            IntPkOrderItemDAO orderItemDAO = new 
IntPkOrderItemDAO(containerComposer.getSourceDataSource(), 
containerComposer.getDatabaseType());
+            orderItemDAO.createTable();
             
containerComposer.createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
             
containerComposer.createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             createTargetOrderTableRule(containerComposer);
             createTargetOrderItemTableRule(containerComposer);
-            Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data begin: {}", LocalDateTime.now());
-            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
-            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+            List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(
+                    containerComposer.getDatabaseType(), new 
AutoIncrementKeyGenerateAlgorithm(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
orderInsertData);
+            
orderItemDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             int replicationSlotsCount = 
getReplicationSlotsCount(containerComposer);
             log.info("init data end: {}, replication slots count: {}", 
LocalDateTime.now(), replicationSlotsCount);
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
@@ -90,8 +93,9 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
                     containerComposer.getDatabaseType(), 20));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
+            // TODO Insert new record in t_order_item
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
-            containerComposer.assertOrderRecordExist(jdbcDataSource, 
qualifiedTableName, 10000);
+            containerComposer.assertRecordExists(jdbcDataSource, 
qualifiedTableName, 10000);
             checkOrderMigration(distSQLFacade, jobId);
             startMigrationWithSchema(containerComposer, "t_order_item", 
"t_order_item");
             checkOrderItemMigration(distSQLFacade);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 022b9882dae..90fa1c2d7ee 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -99,7 +99,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
                 }
                 Object orderId = 
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next();
                 insertOneOrder(containerComposer, orderId);
-                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", orderId);
+                containerComposer.assertRecordExists(dataSource, "t_order", 
orderId);
             };
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
@@ -176,7 +176,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next());
-                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", uniqueKey);
+                containerComposer.assertRecordExists(dataSource, "t_order", 
uniqueKey);
             });
         }
     }
@@ -199,7 +199,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next());
-                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", uniqueKey);
+                containerComposer.assertRecordExists(dataSource, "t_order", 
uniqueKey);
             });
         }
     }
@@ -224,7 +224,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "order_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 // TODO Select by byte[] from proxy doesn't work, so unhex 
function is used for now
-                containerComposer.assertOrderRecordExist(dataSource, 
String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", 
Hex.encodeHexString(uniqueKey)));
+                containerComposer.assertRecordExists(dataSource, 
String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", 
Hex.encodeHexString(uniqueKey)));
             });
         }
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index b39b782e606..37a58ddb55e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -73,7 +73,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             distSQLFacade.waitJobPreparingStageFinished(jobId);
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
-            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", "a1");
+            containerComposer.assertRecordExists(jdbcDataSource, "t_order", 
"a1");
             distSQLFacade.waitJobIncrementalStageFinished(jobId);
             distSQLFacade.startCheckAndVerify(jobId, "CRC32_MATCH");
             distSQLFacade.commit(jobId);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/ExtraSQLCommand.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/ExtraSQLCommand.java
index ce94c52856c..34abbce949b 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/ExtraSQLCommand.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/ExtraSQLCommand.java
@@ -42,6 +42,7 @@ public final class ExtraSQLCommand {
     
     @XmlElement(name = "full-insert-order-item")
     @Getter
+    // TODO Delete
     private String fullInsertOrderItem;
     
     /**
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/IntPkOrderItemDAO.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/IntPkOrderItemDAO.java
new file mode 100644
index 00000000000..c4824b737d0
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/IntPkOrderItemDAO.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.IntPkOrderItemSQLBuilder;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.List;
+
+@Slf4j
+public final class IntPkOrderItemDAO {
+    
+    private final DataSource dataSource;
+    
+    private final IntPkOrderItemSQLBuilder sqlBuilder;
+    
+    public IntPkOrderItemDAO(final DataSource dataSource, final DatabaseType 
databaseType) {
+        this.dataSource = dataSource;
+        sqlBuilder = 
DatabaseTypedSPILoader.getService(IntPkOrderItemSQLBuilder.class, databaseType);
+    }
+    
+    /**
+     * Create order_item table.
+     *
+     * @throws SQLException SQL exception
+     */
+    public void createTable() throws SQLException {
+        String sql = sqlBuilder.buildCreateTableSQL();
+        log.info("Create order_item table SQL: {}", sql);
+        DataSourceExecuteUtils.execute(dataSource, sql);
+    }
+    
+    /**
+     * Batch insert order items.
+     *
+     * @param insertRows insert rows
+     * @throws SQLException SQL exception
+     */
+    public void batchInsert(final int insertRows) throws SQLException {
+        List<Object[]> params = 
PipelineCaseHelper.generateOrderItemInsertData(new 
AutoIncrementKeyGenerateAlgorithm(), insertRows);
+        String sql = sqlBuilder.buildPreparedInsertSQL();
+        log.info("Batch insert order_item SQL: {}, params size: {}", sql, 
params.size());
+        DataSourceExecuteUtils.execute(dataSource, sql, params);
+    }
+    
+    /**
+     * Insert order item.
+     *
+     * @param itemId item id
+     * @param orderId order id
+     * @param userId user id
+     * @param status status
+     * @throws SQLException SQL exception
+     */
+    public void insert(final long itemId, final long orderId, final int 
userId, final String status) throws SQLException {
+        String sql = sqlBuilder.buildPreparedInsertSQL();
+        Object[] params = new Object[]{itemId, orderId, userId, status};
+        log.info("Insert order_item SQL: {}, params: {}", sql, params);
+        DataSourceExecuteUtils.execute(dataSource, sql, params);
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/IntPkOrderItemSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/IntPkOrderItemSQLBuilder.java
new file mode 100644
index 00000000000..99b68fe99d3
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/IntPkOrderItemSQLBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder;
+
+import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPI;
+
+public interface IntPkOrderItemSQLBuilder extends DatabaseTypedSPI {
+    
+    /**
+     * Build create table SQL.
+     *
+     * @return create table SQL
+     */
+    String buildCreateTableSQL();
+    
+    /**
+     * Build prepared insert SQL.
+     *
+     * @return prepared insert SQL
+     */
+    String buildPreparedInsertSQL();
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/MySQLIntPkOrderItemSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/MySQLIntPkOrderItemSQLBuilder.java
new file mode 100644
index 00000000000..7c2b50e021d
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/MySQLIntPkOrderItemSQLBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder;
+
+public final class MySQLIntPkOrderItemSQLBuilder implements 
IntPkOrderItemSQLBuilder {
+    
+    @Override
+    public String buildCreateTableSQL() {
+        return """
+                CREATE TABLE t_order_item (
+                item_id bigint NOT NULL,
+                order_id bigint NOT NULL,
+                user_id int NOT NULL,
+                status varchar(50) DEFAULT NULL,
+                PRIMARY KEY (item_id)
+                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci
+                """;
+    }
+    
+    @Override
+    public String buildPreparedInsertSQL() {
+        return "INSERT INTO t_order_item (item_id, order_id, user_id, status) 
VALUES (?, ?, ?, ?)";
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "MySQL";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/OpenGaussIntPkOrderItemSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/OpenGaussIntPkOrderItemSQLBuilder.java
new file mode 100644
index 00000000000..a9412129f93
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/OpenGaussIntPkOrderItemSQLBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder;
+
+public final class OpenGaussIntPkOrderItemSQLBuilder implements 
IntPkOrderItemSQLBuilder {
+    
+    @Override
+    public String buildCreateTableSQL() {
+        return """
+                CREATE TABLE test.t_order_item (
+                item_id int8 NOT NULL,
+                order_id int8 NOT NULL,
+                user_id int4 NOT NULL,
+                status varchar(50),
+                PRIMARY KEY (item_id)
+                )
+                """;
+    }
+    
+    @Override
+    public String buildPreparedInsertSQL() {
+        return "INSERT INTO test.t_order_item (item_id, order_id, user_id, 
status) VALUES (?, ?, ?, ?)";
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "openGauss";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/PostgreSQLIntPkOrderItemSQLBuilder.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/PostgreSQLIntPkOrderItemSQLBuilder.java
new file mode 100644
index 00000000000..951295785d3
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/orderitem/sqlbuilder/PostgreSQLIntPkOrderItemSQLBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder;
+
+public final class PostgreSQLIntPkOrderItemSQLBuilder implements 
IntPkOrderItemSQLBuilder {
+    
+    @Override
+    public String buildCreateTableSQL() {
+        return """
+                CREATE TABLE test.t_order_item (
+                item_id int8 NOT NULL,
+                order_id int8 NOT NULL,
+                user_id int4 NOT NULL,
+                status varchar(50),
+                PRIMARY KEY (item_id)
+                )
+                """;
+    }
+    
+    @Override
+    public String buildPreparedInsertSQL() {
+        return "INSERT INTO test.t_order_item (item_id, order_id, user_id, 
status) VALUES (?, ?, ?, ?)";
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "PostgreSQL";
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/helper/PipelineCaseHelper.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/helper/PipelineCaseHelper.java
index 858ccdfe0f4..8401957e287 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/helper/PipelineCaseHelper.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/helper/PipelineCaseHelper.java
@@ -61,6 +61,7 @@ public final class PipelineCaseHelper {
      * @param insertRows insert rows
      * @return insert data list
      */
+    // TODO Delete
     public static Pair<List<Object[]>, List<Object[]>> 
generateFullInsertData(final DatabaseType databaseType, final int insertRows) {
         if (insertRows < 0) {
             return Pair.of(null, null);
@@ -81,6 +82,7 @@ public final class PipelineCaseHelper {
      * @return order insert data
      * @throws UnsupportedOperationException Unsupported operation exception
      */
+    // TODO Refactor to use SPI
     public static List<Object[]> generateOrderInsertData(final DatabaseType 
databaseType, final KeyGenerateAlgorithm keyGenerateAlgorithm, final int 
insertRows) {
         List<Object[]> result = new ArrayList<>(insertRows);
         String emojiText = "☠️x☺️x✋x☹️";
@@ -167,7 +169,14 @@ public final class PipelineCaseHelper {
         return ThreadLocalRandom.current().nextInt(-1000000000, 1000000000) / 
1000000.0D;
     }
     
-    private static List<Object[]> generateOrderItemInsertData(final 
KeyGenerateAlgorithm keyGenerateAlgorithm, final int insertRows) {
+    /**
+     * Generate order item insert data.
+     *
+     * @param keyGenerateAlgorithm key generate algorithm
+     * @param insertRows insert rows
+     * @return order item insert data
+     */
+    public static List<Object[]> generateOrderItemInsertData(final 
KeyGenerateAlgorithm keyGenerateAlgorithm, final int insertRows) {
         List<Object[]> result = new ArrayList<>(insertRows);
         for (int i = 0; i < insertRows; i++) {
             Object orderId = 
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next();
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceExecuteUtils.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceExecuteUtils.java
index 2a98a5630dc..40df562db8b 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceExecuteUtils.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceExecuteUtils.java
@@ -25,6 +25,7 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.List;
 
 /**
@@ -38,13 +39,13 @@ public final class DataSourceExecuteUtils {
      *
      * @param dataSource data source
      * @param sql SQL
-     * @throws SQLWrapperException SQL wrapper exception
+     * @throws SQLException SQL exception
      */
-    public static void execute(final DataSource dataSource, final String sql) {
-        try (Connection connection = dataSource.getConnection()) {
-            connection.createStatement().execute(sql);
-        } catch (final SQLException ex) {
-            throw new SQLWrapperException(ex);
+    public static void execute(final DataSource dataSource, final String sql) 
throws SQLException {
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(sql);
         }
     }
     
@@ -53,14 +54,15 @@ public final class DataSourceExecuteUtils {
      *
      * @param dataSource data source
      * @param sql SQL
-     * @param parameters parameters
+     * @param params parameters
      * @throws SQLWrapperException SQL wrapper exception
      */
-    public static void execute(final DataSource dataSource, final String sql, 
final Object[] parameters) {
+    // TODO Throw SQLException
+    public static void execute(final DataSource dataSource, final String sql, 
final Object[] params) {
         try (Connection connection = dataSource.getConnection()) {
             PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
-            for (int i = 0; i < parameters.length; i++) {
-                preparedStatement.setObject(i + 1, parameters[i]);
+            for (int i = 0; i < params.length; i++) {
+                preparedStatement.setObject(i + 1, params[i]);
             }
             preparedStatement.execute();
         } catch (final SQLException ex) {
@@ -73,15 +75,16 @@ public final class DataSourceExecuteUtils {
      *
      * @param dataSource data source
      * @param sql SQL
-     * @param parameters parameters
-     * @throws SQLWrapperException SQL wrapper exception
+     * @param params parameters
+     * @throws SQLException SQL exception
      */
-    public static void execute(final DataSource dataSource, final String sql, 
final List<Object[]> parameters) {
+    // TODO Rename executeBatch
+    public static void execute(final DataSource dataSource, final String sql, 
final List<Object[]> params) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
             int batchSize = 1000;
             int count = 0;
-            for (Object[] each : parameters) {
+            for (Object[] each : params) {
                 for (int i = 0; i < each.length; i++) {
                     preparedStatement.setObject(i + 1, each[i]);
                 }
@@ -94,8 +97,6 @@ public final class DataSourceExecuteUtils {
             if (count % batchSize > 0) {
                 preparedStatement.executeBatch();
             }
-        } catch (final SQLException ex) {
-            throw new SQLWrapperException(ex);
         }
     }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.IntPkOrderItemSQLBuilder
 
b/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.IntPkOrderItemSQLBuilder
new file mode 100644
index 00000000000..6b58325a79b
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.IntPkOrderItemSQLBuilder
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.MySQLIntPkOrderItemSQLBuilder
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.PostgreSQLIntPkOrderItemSQLBuilder
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.sqlbuilder.OpenGaussIntPkOrderItemSQLBuilder


Reply via email to