This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c91e146951a Simply migration IT method and fix occasional ci error.
(#21171)
c91e146951a is described below
commit c91e146951a88d282d4da482c22ea3c8f8254758
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Sep 26 13:43:48 2022 +0800
Simply migration IT method and fix occasional ci error. (#21171)
* Simply migration IT method.
* Decrease initialDelay at job persist service
---
.../persist/PipelineJobProgressPersistService.java | 2 +-
.../data/pipeline/cases/base/BaseITCase.java | 19 ++++++++++------
.../cases/migration/AbstractMigrationITCase.java | 26 ----------------------
.../migration/general/MySQLMigrationGeneralIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralIT.java | 5 +++--
.../primarykey/TextPrimaryKeyMigrationIT.java | 2 +-
6 files changed, 18 insertions(+), 38 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 2fc2045a136..c57e946ccd0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -47,7 +47,7 @@ public final class PipelineJobProgressPersistService {
private static final long DELAY_SECONDS = 1;
static {
- JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new
PersistJobContextRunnable(), 5, DELAY_SECONDS, TimeUnit.SECONDS);
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new
PersistJobContextRunnable(), 0, DELAY_SECONDS, TimeUnit.SECONDS);
}
/**
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 2a0f31d0f45..dae115c71c3 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -62,7 +62,6 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -271,19 +270,25 @@ public abstract class BaseITCase {
getIncreaseTaskThread().start();
}
- protected List<Map<String, Object>> waitJobFinished(final String distSQL)
throws InterruptedException {
+ protected List<Map<String, Object>> waitIncrementTaskFinished(final String
distSQL) throws InterruptedException {
if (null != getIncreaseTaskThread()) {
TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
}
- Set<String> actualStatus;
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 15; i++) {
List<Map<String, Object>> listJobStatus =
queryForListWithLog(distSQL);
log.info("show status result: {}", listJobStatus);
- actualStatus = listJobStatus.stream().map(each ->
each.get("status").toString()).collect(Collectors.toSet());
- assertFalse(CollectionUtils.containsAny(actualStatus,
Arrays.asList(JobStatus.PREPARING_FAILURE.name(),
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
- JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ Set<String> actualStatus = new HashSet<>();
+ List<Integer> incrementalIdleSecondsList = new ArrayList<>();
for (Map<String, Object> each : listJobStatus) {
assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
+ actualStatus.add(each.get("status").toString());
+
incrementalIdleSecondsList.add(Integer.parseInt(each.get("incremental_idle_seconds").toString()));
+ }
+ assertFalse(CollectionUtils.containsAny(actualStatus,
Arrays.asList(JobStatus.PREPARING_FAILURE.name(),
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+ JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ if (Collections.min(incrementalIdleSecondsList) <= 5) {
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
+ continue;
}
if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
return listJobStatus;
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 97aafb8cd7a..445fe1a5492 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,8 +19,6 @@ package
org.apache.shardingsphere.integration.data.pipeline.cases.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
import
org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
@@ -33,7 +31,6 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
@@ -163,33 +160,10 @@ public abstract class AbstractMigrationITCase extends
BaseITCase {
}
protected void assertCheckMigrationSuccess(final String jobId, final
String algorithmType) {
- for (int i = 0; i < 5; i++) {
- if (checkJobIncrementTaskFinished(jobId)) {
- break;
- }
- ThreadUtil.sleep(3, TimeUnit.SECONDS);
- }
- boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
- log.info("second check job result: {}", secondCheckJobResult);
List<Map<String, Object>> checkJobResults =
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')",
jobId, algorithmType));
log.info("check job results: {}", checkJobResults);
for (Map<String, Object> entry : checkJobResults) {
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
}
}
-
- protected boolean checkJobIncrementTaskFinished(final String jobId) {
- List<Map<String, Object>> listJobStatus =
queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- log.info("list job status result: {}", listJobStatus);
- for (Map<String, Object> entry : listJobStatus) {
- if
(!JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString()))
{
- return false;
- }
- int incrementalIdleSeconds =
Integer.parseInt(entry.get("incremental_idle_seconds").toString());
- if (incrementalIdleSeconds < 3) {
- return false;
- }
- }
- return true;
- }
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index e817099770b..0c4047c2286 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -112,7 +112,7 @@ public final class MySQLMigrationGeneralIT extends
AbstractMigrationITCase {
}
private void assertMigrationSuccessById(final String jobId) throws
SQLException, InterruptedException {
- List<Map<String, Object>> jobStatus =
waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ List<Map<String, Object>> jobStatus =
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 582c4842de1..ef5b3421a8e 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -114,11 +114,12 @@ public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase
startMigrationWithSchema(getSourceTableOrderName(), "t_order");
startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate,
SCHEMA_NAME, getSourceTableOrderName(), 20));
String jobId = getJobIdByTableName(getSourceTableOrderName());
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
stopMigrationByJobId(jobId);
sourceExecuteWithLog(String.format("INSERT INTO %s.%s
(order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME,
getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
1, "afterStop"));
startMigrationByJobId(jobId);
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
@@ -126,7 +127,7 @@ public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase
private void checkOrderItemMigration() throws SQLException,
InterruptedException {
startMigrationWithSchema("t_order_item", "t_order_item");
String jobId = getJobIdByTableName("t_order_item");
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index 45258013c6b..bd74d687c65 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -89,8 +89,8 @@ public class TextPrimaryKeyMigrationIT extends
AbstractMigrationITCase {
createTargetOrderTableRule();
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
sourceExecuteWithLog(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(),
"1000000000", 1, "afterStop"));
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
// TODO The ordering of primary or unique keys for text types is
different, but can't reproduce now
if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
assertCheckMigrationSuccess(jobId, "DATA_MATCH");