This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ef0c7102c2a Improve migration IT (#20733)
ef0c7102c2a is described below

commit ef0c7102c2aeef35962c3a59ff8b3b60fa0262cd
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Sep 2 15:41:25 2022 +0800

    Improve migration IT (#20733)
    
    * Improve migration IT
    
    * Improve
---
 .../cases/base/AbstractMigrationITCase.java        | 202 ++++++++++++++++++++
 .../pipeline/cases/base/BaseExtraSQLITCase.java    |  60 ------
 .../data/pipeline/cases/base/BaseITCase.java       | 208 ++++-----------------
 .../cases/general/CreateTableSQLGeneratorIT.java   |   4 +-
 .../cases/general/MySQLMigrationGeneralIT.java     |  14 +-
 .../general/PostgreSQLMigrationGeneralIT.java      |  14 +-
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  14 +-
 .../pipeline/env/IntegrationTestEnvironment.java   |   8 +-
 ...calingITEnvTypeEnum.java => ITEnvTypeEnum.java} |   2 +-
 ...Container.java => DockerComposedContainer.java} |   4 +-
 .../pipeline/framework/watcher/ScalingWatcher.java |   8 +-
 11 files changed, 271 insertions(+), 267 deletions(-)

diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/AbstractMigrationITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/AbstractMigrationITCase.java
new file mode 100644
index 00000000000..9aeb09de62f
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/AbstractMigrationITCase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import 
org.apache.shardingsphere.integration.data.pipeline.cases.command.MigrationDistSQLCommand;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
+
+import javax.xml.bind.JAXB;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@Slf4j
+public abstract class AbstractMigrationITCase extends BaseITCase {
+    
+    private final MigrationDistSQLCommand migrationDistSQLCommand;
+    
+    public AbstractMigrationITCase(final ScalingParameterized parameterized) {
+        super(parameterized);
+        migrationDistSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")),
 MigrationDistSQLCommand.class);
+    }
+    
+    protected void addMigrationSourceResource() throws SQLException {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+            try {
+                proxyExecuteWithLog("DROP MIGRATION SOURCE RESOURCE ds_0", 2);
+            } catch (final SQLException ex) {
+                log.warn("Drop sharding_db failed, maybe it's not exist. error 
msg={}", ex.getMessage());
+            }
+        }
+        String addSourceResource = 
migrationDistSQLCommand.getAddMigrationSourceResourceTemplate().replace("${user}",
 getUsername())
+                .replace("${password}", getPassword())
+                .replace("${ds0}", getActualJdbcUrlTemplate(DS_0, true));
+        addResource(addSourceResource);
+    }
+    
+    protected void addMigrationTargetResource() throws SQLException {
+        String addTargetResource = 
migrationDistSQLCommand.getAddMigrationTargetResourceTemplate().replace("${user}",
 getUsername())
+                .replace("${password}", getPassword())
+                .replace("${ds2}", getActualJdbcUrlTemplate(DS_2, true))
+                .replace("${ds3}", getActualJdbcUrlTemplate(DS_3, true))
+                .replace("${ds4}", getActualJdbcUrlTemplate(DS_4, true));
+        addResource(addTargetResource);
+        List<Map<String, Object>> resources = queryForListWithLog("SHOW 
DATABASE RESOURCES from sharding_db");
+        assertThat(resources.size(), is(3));
+    }
+    
+    protected void createTargetOrderTableRule() throws SQLException {
+        
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
+    }
+    
+    protected void createTargetOrderTableEncryptRule() throws SQLException {
+        
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(),
 2);
+    }
+    
+    protected void createTargetOrderItemTableRule() throws SQLException {
+        
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(),
 2);
+    }
+    
+    protected void startMigrationOrderCopy(final boolean withSchema) throws 
SQLException {
+        if (withSchema) {
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTableWithSchema(),
 1);
+        } else {
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTable(), 
1);
+        }
+    }
+    
+    protected void startMigrationOrder() throws SQLException {
+        
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 1);
+    }
+    
+    protected void startMigrationOrderItem(final boolean withSchema) throws 
SQLException {
+        if (withSchema) {
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
 1);
+        } else {
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(), 
1);
+        }
+    }
+    
+    protected void addMigrationProcessConfig() throws SQLException {
+        try {
+            
proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
+        } catch (final SQLException ex) {
+            if ("58000".equals(ex.getSQLState()) || 
"42000".equals(ex.getSQLState())) {
+                log.warn(ex.getMessage());
+                return;
+            }
+            throw ex;
+        }
+    }
+    
+    protected void stopMigrationByJobId(final String jobId) throws 
SQLException {
+        proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
+    }
+    
+    protected void startMigrationByJobId(final String jobId) throws 
SQLException {
+        proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
+    }
+    
+    protected void commitMigrationByJobId(final String jobId) throws 
SQLException {
+        proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
+    }
+    
+    protected List<String> listJobId() {
+        List<Map<String, Object>> jobList = queryForListWithLog("SHOW 
MIGRATION LIST");
+        return jobList.stream().map(a -> 
a.get("id").toString()).collect(Collectors.toList());
+    }
+    
+    protected String getJobIdByTableName(final String tableName) {
+        List<Map<String, Object>> jobList = queryForListWithLog("SHOW 
MIGRATION LIST");
+        return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
+    }
+    
+    @SneakyThrows(InterruptedException.class)
+    protected void waitMigrationFinished(final String jobId) {
+        if (null != getIncreaseTaskThread()) {
+            TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
+        }
+        log.info("jobId: {}", jobId);
+        Set<String> actualStatus;
+        for (int i = 0; i < 10; i++) {
+            List<Map<String, Object>> showJobStatusResult = 
showJobStatus(jobId);
+            log.info("show migration status result: {}", showJobStatusResult);
+            actualStatus = showJobStatusResult.stream().map(each -> 
each.get("status").toString()).collect(Collectors.toSet());
+            assertFalse(CollectionUtils.containsAny(actualStatus, 
Arrays.asList(JobStatus.PREPARING_FAILURE.name(), 
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+                    JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+            if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+                break;
+            } else if (actualStatus.size() >= 1 && 
actualStatus.containsAll(new HashSet<>(Arrays.asList("", 
JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
+                log.warn("one of the shardingItem was not started correctly");
+            }
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
+        }
+    }
+    
+    protected List<Map<String, Object>> showJobStatus(final String jobId) {
+        return queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
+    }
+    
+    protected void assertCheckMigrationSuccess(final String jobId) {
+        for (int i = 0; i < 10; i++) {
+            if (checkJobIncrementTaskFinished(jobId)) {
+                break;
+            }
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
+        }
+        boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
+        log.info("second check job result: {}", secondCheckJobResult);
+        List<Map<String, Object>> checkJobResults = 
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE 
(NAME='DATA_MATCH')", jobId));
+        log.info("check job results: {}", checkJobResults);
+        for (Map<String, Object> entry : checkJobResults) {
+            
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+        }
+    }
+    
+    protected boolean checkJobIncrementTaskFinished(final String jobId) {
+        List<Map<String, Object>> listJobStatus = showJobStatus(jobId);
+        log.info("list job status result: {}", listJobStatus);
+        for (Map<String, Object> entry : listJobStatus) {
+            if 
(!JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString()))
 {
+                return false;
+            }
+            int incrementalIdleSeconds = 
Integer.parseInt(entry.get("incremental_idle_seconds").toString());
+            if (incrementalIdleSeconds < 3) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
deleted file mode 100644
index 0f260ebe5a2..00000000000
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.integration.data.pipeline.cases.base;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
-import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import 
org.apache.shardingsphere.test.integration.env.container.atomic.util.DatabaseTypeUtil;
-
-import javax.xml.bind.JAXB;
-import java.sql.SQLException;
-import java.util.Objects;
-
-@Slf4j
-public abstract class BaseExtraSQLITCase extends BaseITCase {
-    
-    @Getter
-    private final ExtraSQLCommand extraSQLCommand;
-    
-    public BaseExtraSQLITCase(final ScalingParameterized parameterized) {
-        super(parameterized);
-        extraSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(BaseExtraSQLITCase.class.getClassLoader().getResource(parameterized.getScenario())),
 ExtraSQLCommand.class);
-    }
-    
-    protected void createSourceOrderTable() throws SQLException {
-        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder());
-    }
-    
-    protected void createSourceTableIndexList(final String schema) throws 
SQLException {
-        if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS 
idx_user_id ON %s.t_order_copy ( user_id )", schema));
-        } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON 
%s.t_order_copy ( user_id )", schema));
-        }
-    }
-    
-    protected void createSourceCommentOnList(final String schema) throws 
SQLException {
-        sourceExecuteWithLog(String.format("COMMENT ON COLUMN 
%s.t_order_copy.user_id IS 'user id'", schema));
-    }
-    
-    protected void createSourceOrderItemTable() throws SQLException {
-        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
-    }
-}
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index d7b2acea72b..634fa89de25 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -21,18 +21,15 @@ import com.zaxxer.hikari.HikariDataSource;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.integration.data.pipeline.cases.command.MigrationDistSQLCommand;
+import 
org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
 import 
org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
-import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
+import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.watcher.ScalingWatcher;
@@ -53,19 +50,13 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @Slf4j
@@ -76,8 +67,12 @@ public abstract class BaseITCase {
     
     protected static final String SCHEMA_NAME = "test";
     
+    protected static final String PROXY_DATABASE = "sharding_db";
+    
     protected static final String DS_0 = "scaling_it_0";
     
+    protected static final String DS_1 = "scaling_it_1";
+    
     protected static final String DS_2 = "scaling_it_2";
     
     protected static final String DS_3 = "scaling_it_3";
@@ -94,7 +89,7 @@ public abstract class BaseITCase {
     
     private final BaseComposedContainer composedContainer;
     
-    private final MigrationDistSQLCommand migrationDistSQLCommand;
+    private final ExtraSQLCommand extraSQLCommand;
     
     private final DatabaseType databaseType;
     
@@ -111,14 +106,14 @@ public abstract class BaseITCase {
     
     public BaseITCase(final ScalingParameterized parameterized) {
         databaseType = parameterized.getDatabaseType();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
-            composedContainer = new 
MigrationComposedContainer(parameterized.getDatabaseType(), 
parameterized.getDockerImageName());
+        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
+            composedContainer = new 
DockerComposedContainer(parameterized.getDatabaseType(), 
parameterized.getDockerImageName());
         } else {
             composedContainer = new 
NativeComposedContainer(parameterized.getDatabaseType());
         }
         composedContainer.start();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
-            DockerStorageContainer storageContainer = 
((MigrationComposedContainer) composedContainer).getStorageContainer();
+        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
+            DockerStorageContainer storageContainer = 
((DockerComposedContainer) composedContainer).getStorageContainer();
             username = storageContainer.getUsername();
             password = storageContainer.getUnifiedPassword();
         } else {
@@ -126,10 +121,10 @@ public abstract class BaseITCase {
             password = ENV.getActualDataSourcePassword(databaseType);
         }
         createProxyDatabase(parameterized.getDatabaseType());
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
             cleanUpDataSource();
         }
-        migrationDistSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")),
 MigrationDistSQLCommand.class);
+        extraSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource(parameterized.getScenario())),
 ExtraSQLCommand.class);
         scalingWatcher = new ScalingWatcher(composedContainer);
     }
     
@@ -146,19 +141,19 @@ public abstract class BaseITCase {
         }
         String jdbcUrl = 
composedContainer.getProxyJdbcUrl(defaultDatabaseName);
         try (Connection connection = DriverManager.getConnection(jdbcUrl, 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
-            if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+            if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
                 try {
-                    connectionExecuteWithLog(connection, "DROP DATABASE 
sharding_db");
+                    connectionExecuteWithLog(connection, String.format("DROP 
DATABASE %s", PROXY_DATABASE));
                 } catch (final SQLException ex) {
-                    log.warn("Drop sharding_db failed, maybe it's not exist. 
error msg={}", ex.getMessage());
+                    log.warn("Drop proxy database failed, maybe it's not 
exist. error msg={}", ex.getMessage());
                 }
             }
-            connectionExecuteWithLog(connection, "CREATE DATABASE 
sharding_db");
+            connectionExecuteWithLog(connection, String.format("CREATE 
DATABASE %s", PROXY_DATABASE));
         } catch (final SQLException ex) {
             throw new IllegalStateException(ex);
         }
         sourceDataSource = getDataSource(getActualJdbcUrlTemplate(DS_0, 
false), username, password);
-        proxyDataSource = 
getDataSource(composedContainer.getProxyJdbcUrl("sharding_db"), 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
+        proxyDataSource = 
getDataSource(composedContainer.getProxyJdbcUrl(PROXY_DATABASE), 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
     }
     
     private DataSource getDataSource(final String jdbcUrl, final String 
username, final String password) {
@@ -172,42 +167,13 @@ public abstract class BaseITCase {
         return result;
     }
     
-    @SneakyThrows(SQLException.class)
-    protected void addSourceResource() {
-        try (Connection connection = 
DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl("sharding_db"),
 ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
-            addSourceResource0(connection);
-        }
+    protected void addResource(final String distSQL) throws SQLException {
+        proxyExecuteWithLog(distSQL, 2);
     }
     
-    @SneakyThrows(SQLException.class)
-    private void addSourceResource0(final Connection connection) {
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
-            try {
-                connectionExecuteWithLog(connection, "DROP MIGRATION SOURCE 
RESOURCE ds_0");
-            } catch (final SQLException ex) {
-                log.warn("Drop sharding_db failed, maybe it's not exist. error 
msg={}", ex.getMessage());
-            }
-        }
-        String addSourceResource = 
migrationDistSQLCommand.getAddMigrationSourceResourceTemplate().replace("${user}",
 username)
-                .replace("${password}", password)
-                .replace("${ds0}", getActualJdbcUrlTemplate(DS_0, true));
-        connectionExecuteWithLog(connection, addSourceResource);
-    }
-    
-    protected void addTargetResource() throws SQLException {
-        String addTargetResource = 
migrationDistSQLCommand.getAddMigrationTargetResourceTemplate().replace("${user}",
 username)
-                .replace("${password}", password)
-                .replace("${ds2}", getActualJdbcUrlTemplate(DS_2, true))
-                .replace("${ds3}", getActualJdbcUrlTemplate(DS_3, true))
-                .replace("${ds4}", getActualJdbcUrlTemplate(DS_4, true));
-        proxyExecuteWithLog(addTargetResource, 2);
-        List<Map<String, Object>> resources = queryForListWithLog("SHOW 
DATABASE RESOURCES from sharding_db");
-        assertThat(resources.size(), is(3));
-    }
-    
-    private String getActualJdbcUrlTemplate(final String databaseName, final 
boolean isInContainer) {
-        if (ScalingITEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
-            DockerStorageContainer storageContainer = 
((MigrationComposedContainer) composedContainer).getStorageContainer();
+    protected String getActualJdbcUrlTemplate(final String databaseName, final 
boolean isInContainer) {
+        if (ITEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
+            DockerStorageContainer storageContainer = 
((DockerComposedContainer) composedContainer).getStorageContainer();
             if (isInContainer) {
                 return DataSourceEnvironment.getURL(getDatabaseType(), 
getDatabaseType().getType().toLowerCase() + ".host", 
storageContainer.getPort(), databaseName);
             } else {
@@ -217,48 +183,24 @@ public abstract class BaseITCase {
         return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", 
ENV.getActualDataSourceDefaultPort(databaseType), databaseName);
     }
     
-    protected void createTargetOrderTableRule() throws SQLException {
-        
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
-    }
-    
-    protected void createTargetOrderTableEncryptRule() throws SQLException {
-        
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(),
 2);
+    protected void createSourceOrderTable() throws SQLException {
+        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder());
     }
     
-    protected void createTargetOrderItemTableRule() throws SQLException {
-        
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(),
 2);
-    }
-    
-    protected void startMigrationOrderCopy(final boolean withSchema) throws 
SQLException {
-        if (withSchema) {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTableWithSchema(),
 1);
-        } else {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTable(), 
1);
+    protected void createSourceTableIndexList(final String schema) throws 
SQLException {
+        if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
+            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS 
idx_user_id ON %s.t_order_copy ( user_id )", schema));
+        } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
+            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON 
%s.t_order_copy ( user_id )", schema));
         }
     }
     
-    protected void startMigrationOrder() throws SQLException {
-        
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 1);
+    protected void createSourceCommentOnList(final String schema) throws 
SQLException {
+        sourceExecuteWithLog(String.format("COMMENT ON COLUMN 
%s.t_order_copy.user_id IS 'user id'", schema));
     }
     
-    protected void startMigrationOrderItem(final boolean withSchema) throws 
SQLException {
-        if (withSchema) {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
 1);
-        } else {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(), 
1);
-        }
-    }
-    
-    protected void addMigrationProcessConfig() throws SQLException {
-        try {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
-        } catch (final SQLException ex) {
-            if ("58000".equals(ex.getSQLState()) || 
"42000".equals(ex.getSQLState())) {
-                log.warn(ex.getMessage());
-                return;
-            }
-            throw ex;
-        }
+    protected void createSourceOrderItemTable() throws SQLException {
+        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
     }
     
     protected void createSourceSchema(final String schemaName) throws 
SQLException {
@@ -336,90 +278,10 @@ public abstract class BaseITCase {
         getIncreaseTaskThread().start();
     }
     
-    protected void stopMigrationByJobId(final String jobId) throws 
SQLException {
-        proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
-    }
-    
-    // TODO reopen later
-    protected void startMigrationByJobId(final String jobId) throws 
SQLException {
-        proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
-    }
-    
-    protected void commitMigrationByJobId(final String jobId) throws 
SQLException {
-        proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
-    }
-    
-    protected List<String> listJobId() {
-        List<Map<String, Object>> jobList = queryForListWithLog("SHOW 
MIGRATION LIST");
-        return jobList.stream().map(a -> 
a.get("id").toString()).collect(Collectors.toList());
-    }
-    
-    protected String getJobIdByTableName(final String tableName) {
-        List<Map<String, Object>> jobList = queryForListWithLog("SHOW 
MIGRATION LIST");
-        return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
-    }
-    
-    @SneakyThrows(InterruptedException.class)
-    protected void waitMigrationFinished(final String jobId) {
-        if (null != increaseTaskThread) {
-            TimeUnit.SECONDS.timedJoin(increaseTaskThread, 60);
-        }
-        log.info("jobId: {}", jobId);
-        Set<String> actualStatus;
-        for (int i = 0; i < 10; i++) {
-            List<Map<String, Object>> showScalingStatusResult = 
showScalingStatus(jobId);
-            log.info("show migration status result: {}", 
showScalingStatusResult);
-            actualStatus = showScalingStatusResult.stream().map(each -> 
each.get("status").toString()).collect(Collectors.toSet());
-            assertFalse(CollectionUtils.containsAny(actualStatus, 
Arrays.asList(JobStatus.PREPARING_FAILURE.name(), 
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
-                    JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
-            if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
-                break;
-            } else if (actualStatus.size() >= 1 && 
actualStatus.containsAll(new HashSet<>(Arrays.asList("", 
JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
-                log.warn("one of the shardingItem was not started correctly");
-            }
-            ThreadUtil.sleep(3, TimeUnit.SECONDS);
-        }
-    }
-    
     protected void assertGreaterThanOrderTableInitRows(final int 
tableInitRows, final String schema) throws SQLException {
         proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         String countSQL = StringUtils.isBlank(schema) ? "SELECT COUNT(*) as 
count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", 
schema);
         Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
         assertTrue("actual count " + actual.get("count"), 
Integer.parseInt(actual.get("count").toString()) > tableInitRows);
     }
-    
-    protected List<Map<String, Object>> showScalingStatus(final String jobId) {
-        return queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
-    }
-    
-    protected void assertCheckMigrationSuccess(final String jobId) {
-        for (int i = 0; i < 10; i++) {
-            if (checkJobIncrementTaskFinished(jobId)) {
-                break;
-            }
-            ThreadUtil.sleep(3, TimeUnit.SECONDS);
-        }
-        boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
-        log.info("second check job result: {}", secondCheckJobResult);
-        List<Map<String, Object>> checkScalingResults = 
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE 
(NAME='DATA_MATCH')", jobId));
-        log.info("checkScalingResults: {}", checkScalingResults);
-        for (Map<String, Object> entry : checkScalingResults) {
-            
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
-        }
-    }
-    
-    private boolean checkJobIncrementTaskFinished(final String jobId) {
-        List<Map<String, Object>> listScalingStatus = showScalingStatus(jobId);
-        log.info("listScalingStatus result: {}", listScalingStatus);
-        for (Map<String, Object> entry : listScalingStatus) {
-            if 
(JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString()))
 {
-                return false;
-            }
-            int incrementalIdleSeconds = 
Integer.parseInt(entry.get("incremental_idle_seconds").toString());
-            if (incrementalIdleSeconds < 3) {
-                return false;
-            }
-        }
-        return true;
-    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
index 6d0c0dddb0b..98cbe05e557 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.integration.data.pipeline.cases.entity.CreateTa
 import 
org.apache.shardingsphere.integration.data.pipeline.cases.entity.CreateTableSQLGeneratorAssertionsRootEntity;
 import 
org.apache.shardingsphere.integration.data.pipeline.cases.entity.CreateTableSQLGeneratorOutputEntity;
 import 
org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
-import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.test.integration.env.container.atomic.storage.DockerStorageContainer;
 import 
org.apache.shardingsphere.test.integration.env.container.atomic.storage.StorageContainerFactory;
@@ -87,7 +87,7 @@ public final class CreateTableSQLGeneratorIT {
     @Parameters(name = "{0}")
     public static Collection<ScalingParameterized> getParameters() {
         Collection<ScalingParameterized> result = new LinkedList<>();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
             return result;
         }
         for (String each : ENV.getPostgresVersions()) {
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index 9e78f363a47..4912720ffdf 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -21,9 +21,9 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseExtraSQLITCase;
+import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.AbstractMigrationITCase;
 import 
org.apache.shardingsphere.integration.data.pipeline.cases.task.MySQLIncrementTask;
-import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertThat;
  */
 @Slf4j
 @RunWith(Parameterized.class)
-public final class MySQLMigrationGeneralIT extends BaseExtraSQLITCase {
+public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
     
     private final ScalingParameterized parameterized;
     
@@ -59,7 +59,7 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
     @Parameters(name = "{0}")
     public static Collection<ScalingParameterized> getParameters() {
         Collection<ScalingParameterized> result = new LinkedList<>();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
             return result;
         }
         MySQLDatabaseType databaseType = new MySQLDatabaseType();
@@ -75,8 +75,8 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         addMigrationProcessConfig();
         createSourceOrderTable();
         createSourceOrderItemTable();
-        addSourceResource();
-        addTargetResource();
+        addMigrationSourceResource();
+        addMigrationTargetResource();
         createTargetOrderTableRule();
         createTargetOrderTableEncryptRule();
         createTargetOrderItemTableRule();
@@ -94,7 +94,7 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         String orderItemJobId = getJobIdByTableName("t_order_item");
         assertMigrationSuccessById(orderJobId);
         assertMigrationSuccessById(orderItemJobId);
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             for (String each : listJobId()) {
                 commitMigrationByJobId(each);
             }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index 38f9e6295fc..eb2a6c49c2b 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -22,9 +22,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
-import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseExtraSQLITCase;
+import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.AbstractMigrationITCase;
 import 
org.apache.shardingsphere.integration.data.pipeline.cases.task.PostgreSQLIncrementTask;
-import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
@@ -47,7 +47,7 @@ import static org.junit.Assert.assertThat;
  */
 @Slf4j
 @RunWith(Parameterized.class)
-public final class PostgreSQLMigrationGeneralIT extends BaseExtraSQLITCase {
+public final class PostgreSQLMigrationGeneralIT extends 
AbstractMigrationITCase {
     
     private static final SnowflakeKeyGenerateAlgorithm KEY_GENERATE_ALGORITHM 
= new SnowflakeKeyGenerateAlgorithm();
     
@@ -62,7 +62,7 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
     @Parameters(name = "{0}")
     public static Collection<ScalingParameterized> getParameters() {
         Collection<ScalingParameterized> result = new LinkedList<>();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
             return result;
         }
         for (String dockerImageName : ENV.listDatabaseDockerImageNames(new 
PostgreSQLDatabaseType())) {
@@ -83,8 +83,8 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         createSourceOrderItemTable();
         createSourceTableIndexList(SCHEMA_NAME);
         createSourceCommentOnList(SCHEMA_NAME);
-        addSourceResource();
-        addTargetResource();
+        addMigrationSourceResource();
+        addMigrationTargetResource();
         createTargetOrderTableRule();
         createTargetOrderItemTableRule();
         Pair<List<Object[]>, List<Object[]>> dataPair = 
ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM, 
parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
@@ -93,7 +93,7 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
         checkOrderMigration(jdbcTemplate);
         checkOrderItemMigration();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             for (String each : listJobId()) {
                 commitMigrationByJobId(each);
             }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index 8f747150a91..c800bcd3cff 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
-import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseExtraSQLITCase;
-import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.cases.base.AbstractMigrationITCase;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.junit.Test;
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertThat;
 
 @RunWith(Parameterized.class)
 @Slf4j
-public class TextPrimaryKeyMigrationIT extends BaseExtraSQLITCase {
+public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
     
     public TextPrimaryKeyMigrationIT(final ScalingParameterized parameterized) 
{
         super(parameterized);
@@ -53,7 +53,7 @@ public class TextPrimaryKeyMigrationIT extends 
BaseExtraSQLITCase {
     @Parameters(name = "{0}")
     public static Collection<ScalingParameterized> getParameters() {
         Collection<ScalingParameterized> result = new LinkedList<>();
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
             return result;
         }
         for (String version : ENV.listDatabaseDockerImageNames(new 
MySQLDatabaseType())) {
@@ -73,15 +73,15 @@ public class TextPrimaryKeyMigrationIT extends 
BaseExtraSQLITCase {
         createSourceOrderTable();
         batchInsertOrder();
         addMigrationProcessConfig();
-        addSourceResource();
-        addTargetResource();
+        addMigrationSourceResource();
+        addMigrationTargetResource();
         createTargetOrderTableRule();
         startMigrationOrder();
         String jobId = listJobId().get(0);
         waitMigrationFinished(jobId);
         stopMigrationByJobId(jobId);
         assertCheckMigrationSuccess(jobId);
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
+        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             commitMigrationByJobId(jobId);
             List<String> lastJobIds = listJobId();
             assertThat(lastJobIds.size(), is(0));
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
index 869aff8b163..508df8f0b03 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
@@ -22,7 +22,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,7 +40,7 @@ public final class IntegrationTestEnvironment {
     
     private final Properties props;
     
-    private final ScalingITEnvTypeEnum itEnvType;
+    private final ITEnvTypeEnum itEnvType;
     
     private final List<String> mysqlVersions;
     
@@ -50,7 +50,7 @@ public final class IntegrationTestEnvironment {
     
     private IntegrationTestEnvironment() {
         props = loadProperties();
-        itEnvType = 
ScalingITEnvTypeEnum.valueOf(StringUtils.defaultIfBlank(props.getProperty("scaling.it.env.type").toUpperCase(),
 ScalingITEnvTypeEnum.NONE.name()));
+        itEnvType = 
ITEnvTypeEnum.valueOf(StringUtils.defaultIfBlank(props.getProperty("scaling.it.env.type").toUpperCase(),
 ITEnvTypeEnum.NONE.name()));
         mysqlVersions = 
Arrays.stream(props.getOrDefault("scaling.it.docker.mysql.version", 
"").toString().split(",")).filter(StringUtils::isNotBlank).collect(Collectors.toList());
         postgresVersions = 
Arrays.stream(props.getOrDefault("scaling.it.docker.postgresql.version", 
"").toString().split(",")).filter(StringUtils::isNotBlank).collect(Collectors.toList());
         openGaussVersions = 
Arrays.stream(props.getOrDefault("scaling.it.docker.opengauss.version", 
"").toString().split(",")).filter(StringUtils::isNotBlank).collect(Collectors.toList());
@@ -152,7 +152,7 @@ public final class IntegrationTestEnvironment {
      */
     public List<String> listDatabaseDockerImageNames(final DatabaseType 
databaseType) {
         // Native mode needn't use docker image, just return a list which 
contain one item
-        if (getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+        if (getItEnvType() == ITEnvTypeEnum.NATIVE) {
             return 
databaseType.getType().equalsIgnoreCase(getNativeDatabaseType()) ? 
Collections.singletonList("") : Collections.emptyList();
         }
         switch (databaseType.getType()) {
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ScalingITEnvTypeEnum.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ITEnvTypeEnum.java
similarity index 96%
rename from 
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ScalingITEnvTypeEnum.java
rename to 
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ITEnvTypeEnum.java
index ab2b6da83c8..42ac83cabfa 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ScalingITEnvTypeEnum.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ITEnvTypeEnum.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.integration.data.pipeline.env.enums;
 
-public enum ScalingITEnvTypeEnum {
+public enum ITEnvTypeEnum {
     
     NONE, DOCKER, NATIVE
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
similarity index 95%
rename from 
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
rename to 
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
index 95c505b9484..79cc4b7bc39 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
@@ -33,7 +33,7 @@ import 
org.apache.shardingsphere.test.integration.env.runtime.DataSourceEnvironm
 /**
  * Composed container, include governance container and database container.
  */
-public final class MigrationComposedContainer extends BaseComposedContainer {
+public final class DockerComposedContainer extends BaseComposedContainer {
     
     private final DatabaseType databaseType;
     
@@ -45,7 +45,7 @@ public final class MigrationComposedContainer extends 
BaseComposedContainer {
     @Getter
     private final GovernanceContainer governanceContainer;
     
-    public MigrationComposedContainer(final DatabaseType databaseType, final 
String dockerImageName) {
+    public DockerComposedContainer(final DatabaseType databaseType, final 
String dockerImageName) {
         this.databaseType = databaseType;
         governanceContainer = getContainers().registerContainer(new 
ZookeeperContainer());
         storageContainer = 
getContainers().registerContainer((DockerStorageContainer) 
StorageContainerFactory.newInstance(databaseType, dockerImageName,
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index a87773aad14..cdfb42db14e 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
+import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -50,11 +50,11 @@ public class ScalingWatcher extends TestWatcher {
     }
     
     private void outputZookeeperData() {
-        MigrationComposedContainer migrationComposedContainer = 
(MigrationComposedContainer) composedContainer;
-        DatabaseType databaseType = 
migrationComposedContainer.getStorageContainer().getDatabaseType();
+        DockerComposedContainer dockerComposedContainer = 
(DockerComposedContainer) composedContainer;
+        DatabaseType databaseType = 
dockerComposedContainer.getStorageContainer().getDatabaseType();
         String namespace = "it_db_" + databaseType.getType().toLowerCase();
         ClusterPersistRepositoryConfiguration config = new 
ClusterPersistRepositoryConfiguration("ZooKeeper", namespace,
-                
migrationComposedContainer.getGovernanceContainer().getServerLists(), new 
Properties());
+                
dockerComposedContainer.getGovernanceContainer().getServerLists(), new 
Properties());
         ClusterPersistRepository zookeeperRepository = new 
CuratorZookeeperRepository();
         zookeeperRepository.init(config);
         List<String> childrenKeys = zookeeperRepository.getChildrenKeys("/");

Reply via email to