This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3052a14f108 Improve table both have primary key and unique key at 
inventory task (#20111)
3052a14f108 is described below

commit 3052a14f1085cfe3613371fde98e422b697b2839
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 12 21:01:01 2022 +0800

    Improve table both have primary key and unique key at inventory task 
(#20111)
    
    * Improve table both have primary key and unique key at scaling.
    
    * Add IT case
    
    * Improve
    
    * Fix problem
    
    * Improve performance
    
    * Improve
---
 .../core/ingest/dumper/AbstractInventoryDumper.java        | 12 +++++-------
 .../data/pipeline/cases/general/MySQLGeneralScalingIT.java |  3 ++-
 .../pipeline/cases/general/PostgreSQLGeneralScalingIT.java |  3 ++-
 .../data/pipeline/cases/task/MySQLIncrementTask.java       | 11 +++++++----
 .../data/pipeline/cases/task/PostgreSQLIncrementTask.java  | 11 +++++++----
 .../data/pipeline/framework/helper/ScalingCaseHelper.java  | 14 +++++++++++++-
 .../src/test/resources/env/scenario/general/mysql.xml      |  6 +++---
 .../src/test/resources/env/scenario/general/postgresql.xml |  6 +++---
 8 files changed, 42 insertions(+), 24 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 654aa1d7d3d..6ad8c60b23e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -118,7 +118,7 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
         try (Connection conn = dataSource.getConnection()) {
             int round = 1;
             Optional<Object> maxUniqueKeyValue;
-            while ((maxUniqueKeyValue = dump0(conn, 1 == round ? firstSQL : 
laterSQL, uniqueKeyDataType, startUniqueKeyValue, round++)).isPresent()) {
+            while ((maxUniqueKeyValue = dump0(conn, 1 == round ? firstSQL : 
laterSQL, dumperConfig.getUniqueKey(), uniqueKeyDataType, startUniqueKeyValue, 
round++)).isPresent()) {
                 startUniqueKeyValue = maxUniqueKeyValue.get();
                 if (!isRunning()) {
                     log.info("inventory dump, running is false, break");
@@ -140,7 +140,8 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
         return tableMetaDataLazyInitializer.get();
     }
     
-    private Optional<Object> dump0(final Connection conn, final String sql, 
final int uniqueKeyDataType, final Object startUniqueKeyValue, final int round) 
throws SQLException {
+    private Optional<Object> dump0(final Connection conn, final String sql, 
final String uniqueKey, final int uniqueKeyDataType, final Object 
startUniqueKeyValue, final int round)
+            throws SQLException {
         if (null != rateLimitAlgorithm) {
             rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
         }
@@ -166,13 +167,10 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
                     DataRecord record = new DataRecord(newPosition(resultSet), 
metaData.getColumnCount());
                     record.setType(IngestDataChangeType.INSERT);
                     record.setTableName(logicTableName);
+                    maxUniqueKeyValue = readValue(resultSet, 
tableMetaData.getColumnMetaData(uniqueKey).getOrdinalPosition());
                     for (int i = 1; i <= metaData.getColumnCount(); i++) {
                         boolean isUniqueKey = tableMetaData.isUniqueKey(i - 1);
-                        Object value = readValue(resultSet, i);
-                        if (isUniqueKey) {
-                            maxUniqueKeyValue = value;
-                        }
-                        record.addColumn(new Column(metaData.getColumnName(i), 
value, true, isUniqueKey));
+                        record.addColumn(new Column(metaData.getColumnName(i), 
readValue(resultSet, i), true, isUniqueKey));
                     }
                     pushRecord(record);
                     rowCount++;
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
index 8ba56a5aa49..b2dbaf06509 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
@@ -89,7 +89,8 @@ public final class MySQLGeneralScalingIT extends 
BaseExtraSQLITCase {
         String jobId = getScalingJobId();
         waitScalingFinished(jobId);
         stopScaling(jobId);
-        getJdbcTemplate().update("INSERT INTO t_order 
(id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)", 
keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling", "{}");
+        getJdbcTemplate().update("INSERT INTO t_order 
(id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)", 
keyGenerateAlgorithm.generateKey(), keyGenerateAlgorithm.generateKey(),
+                1, "afterStopScaling", "{}");
         startScaling(jobId);
         assertCheckScalingSuccess(jobId);
         assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
index 34f5d34429a..c4358a615ef 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
@@ -93,7 +93,8 @@ public final class PostgreSQLGeneralScalingIT extends 
BaseExtraSQLITCase {
         String jobId = getScalingJobId();
         waitScalingFinished(jobId);
         stopScaling(jobId);
-        executeWithLog(String.format("INSERT INTO test.t_order 
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", 
keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling"));
+        executeWithLog(String.format("INSERT INTO test.t_order 
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", 
keyGenerateAlgorithm.generateKey(), System.currentTimeMillis(),
+                1, "afterStopScaling"));
         startScaling(jobId);
         assertCheckScalingSuccess(jobId);
         assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
index 5229a66eb13..b31846a4992 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.integration.data.pipeline.cases.task;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseIncrementTask;
+import 
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.springframework.jdbc.core.JdbcTemplate;
 
@@ -32,7 +33,7 @@ public final class MySQLIncrementTask extends 
BaseIncrementTask {
     
     private final JdbcTemplate jdbcTemplate;
     
-    private final KeyGenerateAlgorithm keyGenerateAlgorithm;
+    private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
     
     private final Boolean incrementOrderItemTogether;
     
@@ -61,14 +62,16 @@ public final class MySQLIncrementTask extends 
BaseIncrementTask {
     private Object insertOrder() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
-        Object[] orderInsertDate = new 
Object[]{keyGenerateAlgorithm.generateKey(), random.nextInt(0, 6), 
random.nextInt(0, 6), random.nextInt(1, 99), status};
+        Object[] orderInsertDate = new 
Object[]{primaryKeyGenerateAlgorithm.generateKey(), 
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), 
+                random.nextInt(1, 99), status};
         jdbcTemplate.update("INSERT INTO t_order 
(id,order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?, ?)", 
orderInsertDate);
         return orderInsertDate[0];
     }
     
     private Object insertOrderItem() {
-        String status = ThreadLocalRandom.current().nextInt() % 2 == 0 ? null 
: "NOT-NULL";
-        Object[] orderInsertItemDate = new 
Object[]{keyGenerateAlgorithm.generateKey(), 
ThreadLocalRandom.current().nextInt(0, 6), 
ThreadLocalRandom.current().nextInt(0, 6), status};
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
+        Object[] orderInsertItemDate = new 
Object[]{primaryKeyGenerateAlgorithm.generateKey(), 
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
         jdbcTemplate.update("INSERT INTO 
t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)", 
orderInsertItemDate);
         return orderInsertItemDate[0];
     }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index fdd2de3aaae..ff22ce7f4cd 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -21,6 +21,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseIncrementTask;
+import 
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.springframework.jdbc.core.JdbcTemplate;
 
@@ -62,15 +63,17 @@ public final class PostgreSQLIncrementTask extends 
BaseIncrementTask {
     }
     
     private Object insertOrder() {
-        String status = ThreadLocalRandom.current().nextInt() % 2 == 0 ? null 
: "NOT-NULL";
-        Object[] orderInsertDate = new 
Object[]{keyGenerateAlgorithm.generateKey(), 
ThreadLocalRandom.current().nextInt(0, 6), 
ThreadLocalRandom.current().nextInt(0, 6), status};
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
+        Object[] orderInsertDate = new 
Object[]{keyGenerateAlgorithm.generateKey(), 
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
         jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order 
(id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
         return orderInsertDate[0];
     }
     
     private Object insertOrderItem() {
-        String status = ThreadLocalRandom.current().nextInt() % 2 == 0 ? null 
: "NOT-NULL";
-        Object[] orderInsertItemDate = new 
Object[]{keyGenerateAlgorithm.generateKey(), 
ThreadLocalRandom.current().nextInt(0, 6), 
ThreadLocalRandom.current().nextInt(0, 6), status};
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
+        Object[] orderInsertItemDate = new 
Object[]{keyGenerateAlgorithm.generateKey(), 
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
         jdbcTemplate.update(prefixSchema("INSERT INTO 
${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)", 
schema), orderInsertItemDate);
         return orderInsertItemDate[0];
     }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
index eade8293569..bc11854053d 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 
 import java.math.BigDecimal;
@@ -35,6 +36,17 @@ import java.util.concurrent.ThreadLocalRandom;
  */
 public final class ScalingCaseHelper {
     
+    private static final SnowflakeKeyGenerateAlgorithm 
SNOWFLAKE_KEY_GENERATE_ALGORITHM = new SnowflakeKeyGenerateAlgorithm();
+    
+    /**
+     * Generate snowflake key.
+     *
+     * @return snowflake key
+     */
+    public static long generateSnowflakeKey() {
+        return SNOWFLAKE_KEY_GENERATE_ALGORITHM.generateKey();
+    }
+    
     /**
      * Generate MySQL insert data, contains full fields.
      *
@@ -50,7 +62,7 @@ public final class ScalingCaseHelper {
         List<Object[]> orderData = new ArrayList<>(insertRows);
         List<Object[]> orderItemData = new ArrayList<>(insertRows);
         for (int i = 0; i < insertRows; i++) {
-            int orderId = generateInt(0, 6);
+            long orderId = generateSnowflakeKey();
             int userId = generateInt(0, 6);
             LocalDateTime now = LocalDateTime.now();
             int randomInt = generateInt(-100, 100);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
index 88ca91e7f6d..3e247f6afe2 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
@@ -18,7 +18,7 @@
     <create-table-order>
         CREATE TABLE `t_order` (
         `id` bigint NOT NULL COMMENT 'pk id',
-        `order_id` int NOT NULL,
+        `order_id` bigint NOT NULL,
         `user_id` int NOT NULL,
         `status` varchar ( 255 ) NULL,
         `t_mediumint` mediumint NULL,
@@ -48,14 +48,14 @@
         `t_set` set ('1', '2', '3') NULL,
         `t_json` json NULL COMMENT 'json test',
         PRIMARY KEY ( `id` ),
-        INDEX ( `order_id` )
+        UNIQUE KEY `unique_key_order_id` (`order_id`)
         ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
     </create-table-order>
     
     <create-table-order-item>
         CREATE TABLE t_order_item (
         item_id bigint NOT NULL,
-        order_id int NOT NULL,
+        order_id bigint NOT NULL,
         user_id int NOT NULL,
         status varchar(50) DEFAULT NULL,
         PRIMARY KEY (item_id)
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
index 8b33b504228..9dd2018da57 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
@@ -18,7 +18,7 @@
     <create-table-order>
         CREATE TABLE test.t_order (
         id int8 NOT NULL,
-        order_id int4 NOT NULL,
+        order_id int8 NOT NULL,
         user_id int4 NOT NULL,
         status varchar ( 50 ) NULL,
         t_int2 int2 NULL,
@@ -47,8 +47,8 @@
     <create-table-order-item>
         CREATE TABLE test.t_order_item (
         item_id int8 NOT NULL,
-        order_id int4 NOT NULL,
-        user_id int8 NOT NULL,
+        order_id int8 NOT NULL,
+        user_id int4 NOT NULL,
         status varchar(50),
         PRIMARY KEY (item_id)
         )

Reply via email to