This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cfcbfecee6e Pipeline E2E add clean up method for native mode (#23838)
cfcbfecee6e is described below
commit cfcbfecee6ead96062c2b20dd2d8eec940fc3a67
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Jan 31 12:39:55 2023 +0800
Pipeline E2E add clean up method for native mode (#23838)
* Pipeline E2E add clean up job method for native mode
* Improve
* Improve clean
* Method extraction
* Change method to private
---
.../pipeline/cases/base/PipelineBaseE2EIT.java | 81 ++++++++++++++--------
.../cases/migration/AbstractMigrationE2EIT.java | 20 ++----
.../general/MySQLMigrationGeneralE2EIT.java | 2 +
.../general/PostgreSQLMigrationGeneralE2EIT.java | 2 +
.../primarykey/NoUniqueKeyMigrationE2EIT.java | 2 +
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 2 +
6 files changed, 67 insertions(+), 42 deletions(-)
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 232f3c9d033..ecef8aeb26f 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -23,6 +23,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.test.e2e.data.pipeline.command.ExtraSQLCommand;
@@ -122,42 +123,74 @@ public abstract class PipelineBaseE2EIT {
username = ENV.getActualDataSourceUsername(databaseType);
password = ENV.getActualDataSourcePassword(databaseType);
}
- createProxyDatabase(testParam.getDatabaseType());
- if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
- cleanUpDataSource();
- }
extraSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource(testParam.getScenario())),
ExtraSQLCommand.class);
pipelineWatcher = new PipelineWatcher(containerComposer);
}
- private void cleanUpDataSource() {
- for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
- containerComposer.cleanUpDatabase(each);
- }
- }
-
- protected void createProxyDatabase(final DatabaseType databaseType) {
+ protected void initEnvironment(final DatabaseType databaseType, final
JobType jobType) throws SQLException {
String defaultDatabaseName = "";
if (DatabaseTypeUtil.isPostgreSQL(databaseType) ||
DatabaseTypeUtil.isOpenGauss(databaseType)) {
defaultDatabaseName = "postgres";
}
String jdbcUrl =
containerComposer.getProxyJdbcUrl(defaultDatabaseName);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
- try {
- connectionExecuteWithLog(connection, String.format("DROP
DATABASE %s", PROXY_DATABASE));
- } catch (final SQLException ex) {
- log.warn("Drop proxy database failed, maybe it's not
exist. error msg={}", ex.getMessage());
- }
- }
- connectionExecuteWithLog(connection, String.format("CREATE
DATABASE %s", PROXY_DATABASE));
- } catch (final SQLException ex) {
- throw new IllegalStateException(ex);
+ cleanUpPipelineJobs(connection, jobType);
+ cleanUpProxyDatabase(connection);
+ createProxyDatabase(connection);
}
+ cleanUpDataSource();
sourceDataSource =
StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0,
false)), username, password);
proxyDataSource =
StorageContainerUtil.generateDataSource(containerComposer.getProxyJdbcUrl(PROXY_DATABASE),
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
}
+ private void cleanUpProxyDatabase(final Connection connection) {
+ if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+ return;
+ }
+ try {
+ connection.createStatement().execute(String.format("DROP DATABASE
%s", PROXY_DATABASE));
+ } catch (final SQLException ex) {
+ log.warn("Drop proxy database failed, maybe it's not exist. error
msg={}", ex.getMessage());
+ }
+ }
+
+ private void cleanUpPipelineJobs(final Connection connection, final
JobType jobType) throws SQLException {
+ if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+ return;
+ }
+ String jobTypeName = jobType.getTypeName();
+ List<Map<String, Object>> jobList;
+ try (ResultSet resultSet =
connection.createStatement().executeQuery(String.format("SHOW %s LIST",
jobTypeName))) {
+ jobList = transformResultSetToList(resultSet);
+ }
+ if (jobList.isEmpty()) {
+ return;
+ }
+ for (Map<String, Object> each : jobList) {
+ String jobId = each.get("id").toString();
+ Map<String, Object> jobInfo =
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName,
jobId)).get(0);
+ String status = jobInfo.get("status").toString();
+ if (JobStatus.FINISHED.name().equals(status)) {
+ connection.createStatement().execute(String.format("COMMIT %s
'%s'", jobTypeName, jobId));
+ } else {
+ connection.createStatement().execute(String.format("ROLLBACK
%s '%s'", jobTypeName, jobId));
+ }
+ }
+ }
+
+ private void cleanUpDataSource() {
+ if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+ return;
+ }
+ for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
+ containerComposer.cleanUpDatabase(each);
+ }
+ }
+
+ private void createProxyDatabase(final Connection connection) throws
SQLException {
+ connection.createStatement().execute(String.format("CREATE DATABASE
%s", PROXY_DATABASE));
+ }
+
protected void addResource(final String distSQL) throws SQLException {
proxyExecuteWithLog(distSQL, 2);
}
@@ -233,12 +266,6 @@ public abstract class PipelineBaseE2EIT {
}
}
- protected void connectionExecuteWithLog(final Connection connection, final
String sql) throws SQLException {
- log.info("connection execute:{}", sql);
- connection.createStatement().execute(sql);
- ThreadUtil.sleep(2, TimeUnit.SECONDS);
- }
-
protected List<Map<String, Object>> queryForListWithLog(final String sql) {
int retryNumber = 0;
while (retryNumber <= 3) {
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 79f9efc748f..4c0cf2740c1 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -50,20 +50,6 @@ public abstract class AbstractMigrationE2EIT extends
PipelineBaseE2EIT {
public AbstractMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
migrationDistSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")),
MigrationDistSQLCommand.class);
- if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
- try {
- cleanUpPipelineJobs();
- } catch (final SQLException ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- private void cleanUpPipelineJobs() throws SQLException {
- List<String> jobIds = listJobId();
- for (String each : jobIds) {
- proxyExecuteWithLog(String.format("ROLLBACK MIGRATION '%s'",
each), 0);
- }
}
protected void addMigrationSourceResource() throws SQLException {
@@ -169,11 +155,15 @@ public abstract class AbstractMigrationE2EIT extends
PipelineBaseE2EIT {
List<Map<String, Object>> resultList = Collections.emptyList();
for (int i = 0; i < 10; i++) {
resultList = queryForListWithLog(String.format("SHOW MIGRATION
CHECK STATUS '%s'", jobId));
+ if (resultList.isEmpty()) {
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
+ }
List<String> checkEndTimeList = resultList.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
if (checkEndTimeList.size() == resultList.size()) {
break;
+ } else {
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
- ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
log.info("check job results: {}", resultList);
for (Map<String, Object> each : resultList) {
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 356ebbcbe28..bc01c64ba87 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
@@ -78,6 +79,7 @@ public final class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
@Test
public void assertMigrationSuccess() throws SQLException,
InterruptedException {
log.info("assertMigrationSuccess testParam:{}", testParam);
+ initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
addMigrationProcessConfig();
createSourceOrderTable();
createSourceOrderItemTable();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 0d035e7bc4d..4a1c65f0e48 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
@@ -77,6 +78,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EI
@Test
public void assertMigrationSuccess() throws SQLException,
InterruptedException {
log.info("assertMigrationSuccess testParam:{}", testParam);
+ initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
addMigrationProcessConfig();
createSourceSchema(PipelineBaseE2EIT.SCHEMA_NAME);
createSourceOrderTable();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
index 7cdb8547b20..c3f5489d48d 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
@@ -72,6 +73,7 @@ public final class NoUniqueKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
@Test
public void assertTextPrimaryMigrationSuccess() throws SQLException,
InterruptedException {
log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+ initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
createSourceOrderTable();
try (Connection connection = getSourceDataSource().getConnection()) {
AutoIncrementKeyGenerateAlgorithm generateAlgorithm = new
AutoIncrementKeyGenerateAlgorithm();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 85f97c7ef3d..0cbac5e7961 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
@@ -83,6 +84,7 @@ public class TextPrimaryKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
@Test
public void assertTextPrimaryMigrationSuccess() throws SQLException,
InterruptedException {
log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+ initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
createSourceOrderTable();
try (Connection connection = getSourceDataSource().getConnection()) {
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();