This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3052a14f108 Improve table both have primary key and unique key at
inventory task (#20111)
3052a14f108 is described below
commit 3052a14f1085cfe3613371fde98e422b697b2839
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 12 21:01:01 2022 +0800
Improve table both have primary key and unique key at inventory task
(#20111)
* Improve table both have primary key and unique key at scaling.
* Add IT case
* Improve
* Fix problem
* Improve performance
* Improve
---
.../core/ingest/dumper/AbstractInventoryDumper.java | 12 +++++-------
.../data/pipeline/cases/general/MySQLGeneralScalingIT.java | 3 ++-
.../pipeline/cases/general/PostgreSQLGeneralScalingIT.java | 3 ++-
.../data/pipeline/cases/task/MySQLIncrementTask.java | 11 +++++++----
.../data/pipeline/cases/task/PostgreSQLIncrementTask.java | 11 +++++++----
.../data/pipeline/framework/helper/ScalingCaseHelper.java | 14 +++++++++++++-
.../src/test/resources/env/scenario/general/mysql.xml | 6 +++---
.../src/test/resources/env/scenario/general/postgresql.xml | 6 +++---
8 files changed, 42 insertions(+), 24 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 654aa1d7d3d..6ad8c60b23e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -118,7 +118,7 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
try (Connection conn = dataSource.getConnection()) {
int round = 1;
Optional<Object> maxUniqueKeyValue;
- while ((maxUniqueKeyValue = dump0(conn, 1 == round ? firstSQL :
laterSQL, uniqueKeyDataType, startUniqueKeyValue, round++)).isPresent()) {
+ while ((maxUniqueKeyValue = dump0(conn, 1 == round ? firstSQL :
laterSQL, dumperConfig.getUniqueKey(), uniqueKeyDataType, startUniqueKeyValue,
round++)).isPresent()) {
startUniqueKeyValue = maxUniqueKeyValue.get();
if (!isRunning()) {
log.info("inventory dump, running is false, break");
@@ -140,7 +140,8 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
return tableMetaDataLazyInitializer.get();
}
- private Optional<Object> dump0(final Connection conn, final String sql,
final int uniqueKeyDataType, final Object startUniqueKeyValue, final int round)
throws SQLException {
+ private Optional<Object> dump0(final Connection conn, final String sql,
final String uniqueKey, final int uniqueKeyDataType, final Object
startUniqueKeyValue, final int round)
+ throws SQLException {
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
@@ -166,13 +167,10 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
DataRecord record = new DataRecord(newPosition(resultSet),
metaData.getColumnCount());
record.setType(IngestDataChangeType.INSERT);
record.setTableName(logicTableName);
+ maxUniqueKeyValue = readValue(resultSet,
tableMetaData.getColumnMetaData(uniqueKey).getOrdinalPosition());
for (int i = 1; i <= metaData.getColumnCount(); i++) {
boolean isUniqueKey = tableMetaData.isUniqueKey(i - 1);
- Object value = readValue(resultSet, i);
- if (isUniqueKey) {
- maxUniqueKeyValue = value;
- }
- record.addColumn(new Column(metaData.getColumnName(i),
value, true, isUniqueKey));
+ record.addColumn(new Column(metaData.getColumnName(i),
readValue(resultSet, i), true, isUniqueKey));
}
pushRecord(record);
rowCount++;
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
index 8ba56a5aa49..b2dbaf06509 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
@@ -89,7 +89,8 @@ public final class MySQLGeneralScalingIT extends
BaseExtraSQLITCase {
String jobId = getScalingJobId();
waitScalingFinished(jobId);
stopScaling(jobId);
- getJdbcTemplate().update("INSERT INTO t_order
(id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)",
keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling", "{}");
+ getJdbcTemplate().update("INSERT INTO t_order
(id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)",
keyGenerateAlgorithm.generateKey(), keyGenerateAlgorithm.generateKey(),
+ 1, "afterStopScaling", "{}");
startScaling(jobId);
assertCheckScalingSuccess(jobId);
assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
index 34f5d34429a..c4358a615ef 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
@@ -93,7 +93,8 @@ public final class PostgreSQLGeneralScalingIT extends
BaseExtraSQLITCase {
String jobId = getScalingJobId();
waitScalingFinished(jobId);
stopScaling(jobId);
- executeWithLog(String.format("INSERT INTO test.t_order
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')",
keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling"));
+ executeWithLog(String.format("INSERT INTO test.t_order
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')",
keyGenerateAlgorithm.generateKey(), System.currentTimeMillis(),
+ 1, "afterStopScaling"));
startScaling(jobId);
assertCheckScalingSuccess(jobId);
assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
index 5229a66eb13..b31846a4992 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.integration.data.pipeline.cases.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseIncrementTask;
+import
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -32,7 +33,7 @@ public final class MySQLIncrementTask extends
BaseIncrementTask {
private final JdbcTemplate jdbcTemplate;
- private final KeyGenerateAlgorithm keyGenerateAlgorithm;
+ private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
private final Boolean incrementOrderItemTogether;
@@ -61,14 +62,16 @@ public final class MySQLIncrementTask extends
BaseIncrementTask {
private Object insertOrder() {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
- Object[] orderInsertDate = new
Object[]{keyGenerateAlgorithm.generateKey(), random.nextInt(0, 6),
random.nextInt(0, 6), random.nextInt(1, 99), status};
+ Object[] orderInsertDate = new
Object[]{primaryKeyGenerateAlgorithm.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6),
+ random.nextInt(1, 99), status};
jdbcTemplate.update("INSERT INTO t_order
(id,order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?, ?)",
orderInsertDate);
return orderInsertDate[0];
}
private Object insertOrderItem() {
- String status = ThreadLocalRandom.current().nextInt() % 2 == 0 ? null
: "NOT-NULL";
- Object[] orderInsertItemDate = new
Object[]{keyGenerateAlgorithm.generateKey(),
ThreadLocalRandom.current().nextInt(0, 6),
ThreadLocalRandom.current().nextInt(0, 6), status};
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
+ Object[] orderInsertItemDate = new
Object[]{primaryKeyGenerateAlgorithm.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
jdbcTemplate.update("INSERT INTO
t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)",
orderInsertItemDate);
return orderInsertItemDate[0];
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index fdd2de3aaae..ff22ce7f4cd 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -21,6 +21,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseIncrementTask;
+import
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -62,15 +63,17 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
}
private Object insertOrder() {
- String status = ThreadLocalRandom.current().nextInt() % 2 == 0 ? null
: "NOT-NULL";
- Object[] orderInsertDate = new
Object[]{keyGenerateAlgorithm.generateKey(),
ThreadLocalRandom.current().nextInt(0, 6),
ThreadLocalRandom.current().nextInt(0, 6), status};
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
+ Object[] orderInsertDate = new
Object[]{keyGenerateAlgorithm.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order
(id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
return orderInsertDate[0];
}
private Object insertOrderItem() {
- String status = ThreadLocalRandom.current().nextInt() % 2 == 0 ? null
: "NOT-NULL";
- Object[] orderInsertItemDate = new
Object[]{keyGenerateAlgorithm.generateKey(),
ThreadLocalRandom.current().nextInt(0, 6),
ThreadLocalRandom.current().nextInt(0, 6), status};
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
+ Object[] orderInsertItemDate = new
Object[]{keyGenerateAlgorithm.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
jdbcTemplate.update(prefixSchema("INSERT INTO
${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)",
schema), orderInsertItemDate);
return orderInsertItemDate[0];
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
index eade8293569..bc11854053d 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import java.math.BigDecimal;
@@ -35,6 +36,17 @@ import java.util.concurrent.ThreadLocalRandom;
*/
public final class ScalingCaseHelper {
+ private static final SnowflakeKeyGenerateAlgorithm
SNOWFLAKE_KEY_GENERATE_ALGORITHM = new SnowflakeKeyGenerateAlgorithm();
+
+ /**
+ * Generate snowflake key.
+ *
+ * @return snowflake key
+ */
+ public static long generateSnowflakeKey() {
+ return SNOWFLAKE_KEY_GENERATE_ALGORITHM.generateKey();
+ }
+
/**
* Generate MySQL insert data, contains full fields.
*
@@ -50,7 +62,7 @@ public final class ScalingCaseHelper {
List<Object[]> orderData = new ArrayList<>(insertRows);
List<Object[]> orderItemData = new ArrayList<>(insertRows);
for (int i = 0; i < insertRows; i++) {
- int orderId = generateInt(0, 6);
+ long orderId = generateSnowflakeKey();
int userId = generateInt(0, 6);
LocalDateTime now = LocalDateTime.now();
int randomInt = generateInt(-100, 100);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
index 88ca91e7f6d..3e247f6afe2 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
@@ -18,7 +18,7 @@
<create-table-order>
CREATE TABLE `t_order` (
`id` bigint NOT NULL COMMENT 'pk id',
- `order_id` int NOT NULL,
+ `order_id` bigint NOT NULL,
`user_id` int NOT NULL,
`status` varchar ( 255 ) NULL,
`t_mediumint` mediumint NULL,
@@ -48,14 +48,14 @@
`t_set` set ('1', '2', '3') NULL,
`t_json` json NULL COMMENT 'json test',
PRIMARY KEY ( `id` ),
- INDEX ( `order_id` )
+ UNIQUE KEY `unique_key_order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
</create-table-order>
<create-table-order-item>
CREATE TABLE t_order_item (
item_id bigint NOT NULL,
- order_id int NOT NULL,
+ order_id bigint NOT NULL,
user_id int NOT NULL,
status varchar(50) DEFAULT NULL,
PRIMARY KEY (item_id)
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
index 8b33b504228..9dd2018da57 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
@@ -18,7 +18,7 @@
<create-table-order>
CREATE TABLE test.t_order (
id int8 NOT NULL,
- order_id int4 NOT NULL,
+ order_id int8 NOT NULL,
user_id int4 NOT NULL,
status varchar ( 50 ) NULL,
t_int2 int2 NULL,
@@ -47,8 +47,8 @@
<create-table-order-item>
CREATE TABLE test.t_order_item (
item_id int8 NOT NULL,
- order_id int4 NOT NULL,
- user_id int8 NOT NULL,
+ order_id int8 NOT NULL,
+ user_id int4 NOT NULL,
status varchar(50),
PRIMARY KEY (item_id)
)