This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d265315c2fc Improve migration IT, add clean up pipeline at native mode
(#21105)
d265315c2fc is described below
commit d265315c2fc35309de886d1b4f358d19530c090c
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Sep 21 13:01:30 2022 +0800
Improve migration IT, add clean up pipeline at native mode (#21105)
* Fix postgres task failed and add init data time log.
* Fix postgres task failed and add init data time log.
* Add cleanUp pipeline at native mode
* Fix bug
* Fix codestyle
* rollback PostgresSQL checker
---
.../data/pipeline/cases/base/BaseITCase.java | 6 ++---
.../cases/migration/AbstractMigrationITCase.java | 22 ++++++++++++++----
.../general/PostgreSQLMigrationGeneralIT.java | 5 ++++-
.../primarykey/TextPrimaryKeyMigrationIT.java | 4 +++-
.../cases/task/PostgreSQLIncrementTask.java | 26 ++++++++++------------
.../pipeline/env/IntegrationTestEnvironment.java | 2 +-
.../env/scenario/primary_key/unique_key/mysql.xml | 2 +-
7 files changed, 42 insertions(+), 25 deletions(-)
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index bc6af3f4258..2a0f31d0f45 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -129,7 +129,7 @@ public abstract class BaseITCase {
password = ENV.getActualDataSourcePassword(databaseType);
}
createProxyDatabase(parameterized.getDatabaseType());
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
cleanUpDataSource();
}
extraSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource(parameterized.getScenario())),
ExtraSQLCommand.class);
@@ -137,7 +137,7 @@ public abstract class BaseITCase {
}
private void cleanUpDataSource() {
- for (String each : Arrays.asList(DS_0, DS_2, DS_3, DS_4)) {
+ for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
containerComposer.cleanUpDatabase(each);
}
}
@@ -149,7 +149,7 @@ public abstract class BaseITCase {
}
String jdbcUrl =
containerComposer.getProxyJdbcUrl(defaultDatabaseName);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
try {
connectionExecuteWithLog(connection, String.format("DROP
DATABASE %s", PROXY_DATABASE));
} catch (final SQLException ex) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 157131a2b2f..97aafb8cd7a 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -49,10 +49,24 @@ public abstract class AbstractMigrationITCase extends
BaseITCase {
public AbstractMigrationITCase(final ScalingParameterized parameterized) {
super(parameterized);
migrationDistSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/migration-command.xml")),
MigrationDistSQLCommand.class);
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
+ try {
+ cleanUpPipelineJobs();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private void cleanUpPipelineJobs() throws SQLException {
+ List<String> jobIds = listJobId();
+ for (String each : jobIds) {
+ proxyExecuteWithLog(String.format("ROLLBACK MIGRATION '%s'",
each), 0);
+ }
}
protected void addMigrationSourceResource() throws SQLException {
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
try {
proxyExecuteWithLog("DROP MIGRATION SOURCE RESOURCE ds_0", 2);
} catch (final SQLException ex) {
@@ -108,15 +122,15 @@ public abstract class AbstractMigrationITCase extends
BaseITCase {
}
protected void startMigration(final String sourceTableName, final String
targetTableName) throws SQLException {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName,
targetTableName), 1);
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName,
targetTableName), 5);
}
protected void startMigrationWithSchema(final String sourceTableName,
final String targetTableName) throws SQLException {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName,
targetTableName), 1);
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName,
targetTableName), 5);
}
protected void addMigrationProcessConfig() throws SQLException {
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
try {
proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION
'/'", 0);
} catch (final SQLException ex) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 8c89cdf0628..582c4842de1 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -35,6 +35,7 @@ import org.junit.runners.Parameterized.Parameters;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.SQLException;
+import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -93,8 +94,10 @@ public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase
createTargetOrderItemTableRule();
Pair<List<Object[]>, List<Object[]>> dataPair =
ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM,
parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+ log.info("init data begin: {}", LocalDateTime.now());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()),
dataPair.getLeft());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
+ log.info("init data end: {}", LocalDateTime.now());
checkOrderMigration(jdbcTemplate);
checkOrderItemMigration();
if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
@@ -109,7 +112,7 @@ public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase
private void checkOrderMigration(final JdbcTemplate jdbcTemplate) throws
SQLException, InterruptedException {
startMigrationWithSchema(getSourceTableOrderName(), "t_order");
- startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate,
SCHEMA_NAME, getSourceTableOrderName(), false, 20));
+ startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate,
SCHEMA_NAME, getSourceTableOrderName(), 20));
String jobId = getJobIdByTableName(getSourceTableOrderName());
waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(jobId);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index f06b4624061..45258013c6b 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -105,6 +106,7 @@ public class TextPrimaryKeyMigrationIT extends
AbstractMigrationITCase {
}
private void batchInsertOrder() throws SQLException {
+ log.info("init data begin: {}", LocalDateTime.now());
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
try (Connection connection = getSourceDataSource().getConnection()) {
PreparedStatement preparedStatement =
connection.prepareStatement(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (?,?,?)", getSourceTableOrderName()));
@@ -116,6 +118,6 @@ public class TextPrimaryKeyMigrationIT extends
AbstractMigrationITCase {
}
preparedStatement.executeBatch();
}
- log.info("init data succeed");
+ log.info("init data end: {}", LocalDateTime.now());
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index a30ea237fef..191890a223b 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -42,8 +42,6 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
private final String orderTableName;
- private final boolean incrementOrderItemTogether;
-
private final int executeCountLimit;
static {
@@ -58,15 +56,13 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
while (executeCount < executeCountLimit &&
!Thread.currentThread().isInterrupted()) {
Object orderId = insertOrder();
if (0 == executeCount % 2) {
- jdbcTemplate.update(String.format(prefixSchema("DELETE FROM
${schema}%s WHERE order_id = ?", schema), orderTableName), orderId);
+ jdbcTemplate.update(String.format("DELETE FROM %s WHERE
order_id = ?", getTableNameWithSchema(orderTableName)), orderId);
} else {
updateOrderByPrimaryKey(orderId);
}
- if (incrementOrderItemTogether) {
- Object orderItemPrimaryKey = insertOrderItem();
- jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order_item
SET status = ? WHERE item_id = ?", schema), "updated" +
Instant.now().getEpochSecond(), orderItemPrimaryKey);
- jdbcTemplate.update(prefixSchema("DELETE FROM
${schema}t_order_item WHERE item_id = ?", schema), orderItemPrimaryKey);
- }
+ Object orderItemPrimaryKey = insertOrderItem();
+ jdbcTemplate.update(String.format("UPDATE %s SET status = ? WHERE
item_id = ?", getTableNameWithSchema("t_order_item")), "updated" +
Instant.now().getEpochSecond(), orderItemPrimaryKey);
+ jdbcTemplate.update(String.format("DELETE FROM %s WHERE item_id =
?", getTableNameWithSchema("t_order_item")), orderItemPrimaryKey);
executeCount++;
}
log.info("PostgreSQL increment task runnable execute successfully.");
@@ -76,7 +72,9 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
Object[] orderInsertDate = new
Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status};
- jdbcTemplate.update(String.format(prefixSchema("INSERT INTO
${schema}%s (order_id,user_id,status) VALUES (?, ?, ?)", schema),
orderTableName), orderInsertDate);
+ String insertSQL = String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (?, ?, ?)",
getTableNameWithSchema(orderTableName));
+ log.info("insert order sql:{}", insertSQL);
+ jdbcTemplate.update(insertSQL, orderInsertDate);
return orderInsertDate[0];
}
@@ -84,20 +82,20 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
Object[] orderInsertItemDate = new
Object[]{KEY_GENERATE_ALGORITHM.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
- jdbcTemplate.update(prefixSchema("INSERT INTO
${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)",
schema), orderInsertItemDate);
+ jdbcTemplate.update(String.format("INSERT INTO
%s(item_id,order_id,user_id,status) VALUES(?,?,?,?)",
getTableNameWithSchema("t_order_item")), orderInsertItemDate);
return orderInsertItemDate[0];
}
private void updateOrderByPrimaryKey(final Object primaryKey) {
Object[] updateData = {"updated" + Instant.now().getEpochSecond(),
primaryKey};
- jdbcTemplate.update(String.format(prefixSchema("UPDATE ${schema}%s SET
status = ? WHERE order_id = ?", schema), updateData));
+ jdbcTemplate.update(String.format("UPDATE %s SET status = ? WHERE
order_id = ?", getTableNameWithSchema(orderTableName)), updateData);
}
- private String prefixSchema(final String sql, final String schema) {
+ private String getTableNameWithSchema(final String tableName) {
if (StringUtils.isNotBlank(schema)) {
- return sql.replace("${schema}", schema + ".");
+ return String.join(".", schema, tableName);
} else {
- return sql.replace("${schema}", "");
+ return tableName;
}
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
index 134eb290975..f3f6c0d902e 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
@@ -153,7 +153,7 @@ public final class IntegrationTestEnvironment {
*/
public List<String> listStorageContainerImages(final DatabaseType
databaseType) {
// Native mode needn't use docker image, just return a list which
contain one item
- if (getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == getItEnvType()) {
return
databaseType.getType().equalsIgnoreCase(getNativeDatabaseType()) ?
Collections.singletonList("") : Collections.emptyList();
}
switch (databaseType.getType()) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
index 5f8a288d79a..baeb0d95b75 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
@@ -16,7 +16,7 @@
-->
<command>
<create-table-order>
- CREATE TABLE `t_order` (
+ CREATE TABLE `%s` (
`order_id` varchar(255) NOT NULL,
`user_id` INT NOT NULL,
`status` varchar(255) NULL,