This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cfcbfecee6e Pipeline E2E add clean up method for native mode (#23838)
cfcbfecee6e is described below

commit cfcbfecee6ead96062c2b20dd2d8eec940fc3a67
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Jan 31 12:39:55 2023 +0800

    Pipeline E2E add clean up method for native mode (#23838)
    
    * Pipeline E2E add clean up job method for native mode
    
    * Improve
    
    * Improve clean
    
    * Method extraction
    
    * Change method to private
---
 .../pipeline/cases/base/PipelineBaseE2EIT.java     | 81 ++++++++++++++--------
 .../cases/migration/AbstractMigrationE2EIT.java    | 20 ++----
 .../general/MySQLMigrationGeneralE2EIT.java        |  2 +
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  2 +
 .../primarykey/NoUniqueKeyMigrationE2EIT.java      |  2 +
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |  2 +
 6 files changed, 67 insertions(+), 42 deletions(-)

diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 232f3c9d033..ecef8aeb26f 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -23,6 +23,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.command.ExtraSQLCommand;
@@ -122,42 +123,74 @@ public abstract class PipelineBaseE2EIT {
             username = ENV.getActualDataSourceUsername(databaseType);
             password = ENV.getActualDataSourcePassword(databaseType);
         }
-        createProxyDatabase(testParam.getDatabaseType());
-        if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
-            cleanUpDataSource();
-        }
         extraSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource(testParam.getScenario())),
 ExtraSQLCommand.class);
         pipelineWatcher = new PipelineWatcher(containerComposer);
     }
     
-    private void cleanUpDataSource() {
-        for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
-            containerComposer.cleanUpDatabase(each);
-        }
-    }
-    
-    protected void createProxyDatabase(final DatabaseType databaseType) {
+    protected void initEnvironment(final DatabaseType databaseType, final 
JobType jobType) throws SQLException {
         String defaultDatabaseName = "";
         if (DatabaseTypeUtil.isPostgreSQL(databaseType) || 
DatabaseTypeUtil.isOpenGauss(databaseType)) {
             defaultDatabaseName = "postgres";
         }
         String jdbcUrl = 
containerComposer.getProxyJdbcUrl(defaultDatabaseName);
         try (Connection connection = DriverManager.getConnection(jdbcUrl, 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
-            if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
-                try {
-                    connectionExecuteWithLog(connection, String.format("DROP 
DATABASE %s", PROXY_DATABASE));
-                } catch (final SQLException ex) {
-                    log.warn("Drop proxy database failed, maybe it's not 
exist. error msg={}", ex.getMessage());
-                }
-            }
-            connectionExecuteWithLog(connection, String.format("CREATE 
DATABASE %s", PROXY_DATABASE));
-        } catch (final SQLException ex) {
-            throw new IllegalStateException(ex);
+            cleanUpPipelineJobs(connection, jobType);
+            cleanUpProxyDatabase(connection);
+            createProxyDatabase(connection);
         }
+        cleanUpDataSource();
         sourceDataSource = 
StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0,
 false)), username, password);
         proxyDataSource = 
StorageContainerUtil.generateDataSource(containerComposer.getProxyJdbcUrl(PROXY_DATABASE),
 ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
     }
     
+    private void cleanUpProxyDatabase(final Connection connection) {
+        if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+            return;
+        }
+        try {
+            connection.createStatement().execute(String.format("DROP DATABASE 
%s", PROXY_DATABASE));
+        } catch (final SQLException ex) {
+            log.warn("Drop proxy database failed, maybe it's not exist. error 
msg={}", ex.getMessage());
+        }
+    }
+    
+    private void cleanUpPipelineJobs(final Connection connection, final 
JobType jobType) throws SQLException {
+        if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+            return;
+        }
+        String jobTypeName = jobType.getTypeName();
+        List<Map<String, Object>> jobList;
+        try (ResultSet resultSet = 
connection.createStatement().executeQuery(String.format("SHOW %s LIST", 
jobTypeName))) {
+            jobList = transformResultSetToList(resultSet);
+        }
+        if (jobList.isEmpty()) {
+            return;
+        }
+        for (Map<String, Object> each : jobList) {
+            String jobId = each.get("id").toString();
+            Map<String, Object> jobInfo = 
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, 
jobId)).get(0);
+            String status = jobInfo.get("status").toString();
+            if (JobStatus.FINISHED.name().equals(status)) {
+                connection.createStatement().execute(String.format("COMMIT %s 
'%s'", jobTypeName, jobId));
+            } else {
+                connection.createStatement().execute(String.format("ROLLBACK 
%s '%s'", jobTypeName, jobId));
+            }
+        }
+    }
+    
+    private void cleanUpDataSource() {
+        if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+            return;
+        }
+        for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
+            containerComposer.cleanUpDatabase(each);
+        }
+    }
+    
+    private void createProxyDatabase(final Connection connection) throws 
SQLException {
+        connection.createStatement().execute(String.format("CREATE DATABASE 
%s", PROXY_DATABASE));
+    }
+    
     protected void addResource(final String distSQL) throws SQLException {
         proxyExecuteWithLog(distSQL, 2);
     }
@@ -233,12 +266,6 @@ public abstract class PipelineBaseE2EIT {
         }
     }
     
-    protected void connectionExecuteWithLog(final Connection connection, final 
String sql) throws SQLException {
-        log.info("connection execute:{}", sql);
-        connection.createStatement().execute(sql);
-        ThreadUtil.sleep(2, TimeUnit.SECONDS);
-    }
-    
     protected List<Map<String, Object>> queryForListWithLog(final String sql) {
         int retryNumber = 0;
         while (retryNumber <= 3) {
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 79f9efc748f..4c0cf2740c1 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -50,20 +50,6 @@ public abstract class AbstractMigrationE2EIT extends 
PipelineBaseE2EIT {
     public AbstractMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
         migrationDistSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")),
 MigrationDistSQLCommand.class);
-        if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
-            try {
-                cleanUpPipelineJobs();
-            } catch (final SQLException ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-    
-    private void cleanUpPipelineJobs() throws SQLException {
-        List<String> jobIds = listJobId();
-        for (String each : jobIds) {
-            proxyExecuteWithLog(String.format("ROLLBACK MIGRATION '%s'", 
each), 0);
-        }
     }
     
     protected void addMigrationSourceResource() throws SQLException {
@@ -169,11 +155,15 @@ public abstract class AbstractMigrationE2EIT extends 
PipelineBaseE2EIT {
         List<Map<String, Object>> resultList = Collections.emptyList();
         for (int i = 0; i < 10; i++) {
             resultList = queryForListWithLog(String.format("SHOW MIGRATION 
CHECK STATUS '%s'", jobId));
+            if (resultList.isEmpty()) {
+                ThreadUtil.sleep(3, TimeUnit.SECONDS);
+            }
             List<String> checkEndTimeList = resultList.stream().map(map -> 
map.get("check_end_time").toString()).filter(each -> 
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
             if (checkEndTimeList.size() == resultList.size()) {
                 break;
+            } else {
+                ThreadUtil.sleep(3, TimeUnit.SECONDS);
             }
-            ThreadUtil.sleep(3, TimeUnit.SECONDS);
         }
         log.info("check job results: {}", resultList);
         for (Map<String, Object> each : resultList) {
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 356ebbcbe28..bc01c64ba87 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
@@ -78,6 +79,7 @@ public final class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     @Test
     public void assertMigrationSuccess() throws SQLException, 
InterruptedException {
         log.info("assertMigrationSuccess testParam:{}", testParam);
+        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
         addMigrationProcessConfig();
         createSourceOrderTable();
         createSourceOrderItemTable();
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 0d035e7bc4d..4a1c65f0e48 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
@@ -77,6 +78,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EI
     @Test
     public void assertMigrationSuccess() throws SQLException, 
InterruptedException {
         log.info("assertMigrationSuccess testParam:{}", testParam);
+        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
         addMigrationProcessConfig();
         createSourceSchema(PipelineBaseE2EIT.SCHEMA_NAME);
         createSourceOrderTable();
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
index 7cdb8547b20..c3f5489d48d 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
@@ -72,6 +73,7 @@ public final class NoUniqueKeyMigrationE2EIT extends 
AbstractMigrationE2EIT {
     @Test
     public void assertTextPrimaryMigrationSuccess() throws SQLException, 
InterruptedException {
         log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
         createSourceOrderTable();
         try (Connection connection = getSourceDataSource().getConnection()) {
             AutoIncrementKeyGenerateAlgorithm generateAlgorithm = new 
AutoIncrementKeyGenerateAlgorithm();
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 85f97c7ef3d..0cbac5e7961 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
@@ -83,6 +84,7 @@ public class TextPrimaryKeyMigrationE2EIT extends 
AbstractMigrationE2EIT {
     @Test
     public void assertTextPrimaryMigrationSuccess() throws SQLException, 
InterruptedException {
         log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
         createSourceOrderTable();
         try (Connection connection = getSourceDataSource().getConnection()) {
             UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();

Reply via email to