This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ef0c7102c2a Improve migration IT (#20733)
ef0c7102c2a is described below
commit ef0c7102c2aeef35962c3a59ff8b3b60fa0262cd
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Sep 2 15:41:25 2022 +0800
Improve migration IT (#20733)
* Improve migration IT
* Improve
---
.../cases/base/AbstractMigrationITCase.java | 202 ++++++++++++++++++++
.../pipeline/cases/base/BaseExtraSQLITCase.java | 60 ------
.../data/pipeline/cases/base/BaseITCase.java | 208 ++++-----------------
.../cases/general/CreateTableSQLGeneratorIT.java | 4 +-
.../cases/general/MySQLMigrationGeneralIT.java | 14 +-
.../general/PostgreSQLMigrationGeneralIT.java | 14 +-
.../primarykey/TextPrimaryKeyMigrationIT.java | 14 +-
.../pipeline/env/IntegrationTestEnvironment.java | 8 +-
...calingITEnvTypeEnum.java => ITEnvTypeEnum.java} | 2 +-
...Container.java => DockerComposedContainer.java} | 4 +-
.../pipeline/framework/watcher/ScalingWatcher.java | 8 +-
11 files changed, 271 insertions(+), 267 deletions(-)
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/AbstractMigrationITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/AbstractMigrationITCase.java
new file mode 100644
index 00000000000..9aeb09de62f
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/AbstractMigrationITCase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import
org.apache.shardingsphere.integration.data.pipeline.cases.command.MigrationDistSQLCommand;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
+
+import javax.xml.bind.JAXB;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@Slf4j
+public abstract class AbstractMigrationITCase extends BaseITCase {
+
+ private final MigrationDistSQLCommand migrationDistSQLCommand;
+
+ public AbstractMigrationITCase(final ScalingParameterized parameterized) {
+ super(parameterized);
+ migrationDistSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")),
MigrationDistSQLCommand.class);
+ }
+
+ protected void addMigrationSourceResource() throws SQLException {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ try {
+ proxyExecuteWithLog("DROP MIGRATION SOURCE RESOURCE ds_0", 2);
+ } catch (final SQLException ex) {
+ log.warn("Drop sharding_db failed, maybe it's not exist. error
msg={}", ex.getMessage());
+ }
+ }
+ String addSourceResource =
migrationDistSQLCommand.getAddMigrationSourceResourceTemplate().replace("${user}",
getUsername())
+ .replace("${password}", getPassword())
+ .replace("${ds0}", getActualJdbcUrlTemplate(DS_0, true));
+ addResource(addSourceResource);
+ }
+
+ protected void addMigrationTargetResource() throws SQLException {
+ String addTargetResource =
migrationDistSQLCommand.getAddMigrationTargetResourceTemplate().replace("${user}",
getUsername())
+ .replace("${password}", getPassword())
+ .replace("${ds2}", getActualJdbcUrlTemplate(DS_2, true))
+ .replace("${ds3}", getActualJdbcUrlTemplate(DS_3, true))
+ .replace("${ds4}", getActualJdbcUrlTemplate(DS_4, true));
+ addResource(addTargetResource);
+ List<Map<String, Object>> resources = queryForListWithLog("SHOW
DATABASE RESOURCES from sharding_db");
+ assertThat(resources.size(), is(3));
+ }
+
+ protected void createTargetOrderTableRule() throws SQLException {
+
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
+ }
+
+ protected void createTargetOrderTableEncryptRule() throws SQLException {
+
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(),
2);
+ }
+
+ protected void createTargetOrderItemTableRule() throws SQLException {
+
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(),
2);
+ }
+
+ protected void startMigrationOrderCopy(final boolean withSchema) throws
SQLException {
+ if (withSchema) {
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTableWithSchema(),
1);
+ } else {
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTable(),
1);
+ }
+ }
+
+ protected void startMigrationOrder() throws SQLException {
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 1);
+ }
+
+ protected void startMigrationOrderItem(final boolean withSchema) throws
SQLException {
+ if (withSchema) {
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
1);
+ } else {
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(),
1);
+ }
+ }
+
+ protected void addMigrationProcessConfig() throws SQLException {
+ try {
+
proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
+ } catch (final SQLException ex) {
+ if ("58000".equals(ex.getSQLState()) ||
"42000".equals(ex.getSQLState())) {
+ log.warn(ex.getMessage());
+ return;
+ }
+ throw ex;
+ }
+ }
+
+ protected void stopMigrationByJobId(final String jobId) throws
SQLException {
+ proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
+ }
+
+ protected void startMigrationByJobId(final String jobId) throws
SQLException {
+ proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
+ }
+
+ protected void commitMigrationByJobId(final String jobId) throws
SQLException {
+ proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
+ }
+
+ protected List<String> listJobId() {
+ List<Map<String, Object>> jobList = queryForListWithLog("SHOW
MIGRATION LIST");
+ return jobList.stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
+ }
+
+ protected String getJobIdByTableName(final String tableName) {
+ List<Map<String, Object>> jobList = queryForListWithLog("SHOW
MIGRATION LIST");
+ return jobList.stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new
RuntimeException("not find " + tableName + " table")).get("id").toString();
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ protected void waitMigrationFinished(final String jobId) {
+ if (null != getIncreaseTaskThread()) {
+ TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
+ }
+ log.info("jobId: {}", jobId);
+ Set<String> actualStatus;
+ for (int i = 0; i < 10; i++) {
+ List<Map<String, Object>> showJobStatusResult =
showJobStatus(jobId);
+ log.info("show migration status result: {}", showJobStatusResult);
+ actualStatus = showJobStatusResult.stream().map(each ->
each.get("status").toString()).collect(Collectors.toSet());
+ assertFalse(CollectionUtils.containsAny(actualStatus,
Arrays.asList(JobStatus.PREPARING_FAILURE.name(),
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+ JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+ break;
+ } else if (actualStatus.size() >= 1 &&
actualStatus.containsAll(new HashSet<>(Arrays.asList("",
JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
+ log.warn("one of the shardingItem was not started correctly");
+ }
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
+ }
+ }
+
+ protected List<Map<String, Object>> showJobStatus(final String jobId) {
+ return queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
+ }
+
+ protected void assertCheckMigrationSuccess(final String jobId) {
+ for (int i = 0; i < 10; i++) {
+ if (checkJobIncrementTaskFinished(jobId)) {
+ break;
+ }
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
+ }
+ boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
+ log.info("second check job result: {}", secondCheckJobResult);
+ List<Map<String, Object>> checkJobResults =
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE
(NAME='DATA_MATCH')", jobId));
+ log.info("check job results: {}", checkJobResults);
+ for (Map<String, Object> entry : checkJobResults) {
+
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+ }
+ }
+
+ protected boolean checkJobIncrementTaskFinished(final String jobId) {
+ List<Map<String, Object>> listJobStatus = showJobStatus(jobId);
+ log.info("list job status result: {}", listJobStatus);
+ for (Map<String, Object> entry : listJobStatus) {
+ if
(!JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString()))
{
+ return false;
+ }
+ int incrementalIdleSeconds =
Integer.parseInt(entry.get("incremental_idle_seconds").toString());
+ if (incrementalIdleSeconds < 3) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
deleted file mode 100644
index 0f260ebe5a2..00000000000
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.integration.data.pipeline.cases.base;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
-import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import
org.apache.shardingsphere.test.integration.env.container.atomic.util.DatabaseTypeUtil;
-
-import javax.xml.bind.JAXB;
-import java.sql.SQLException;
-import java.util.Objects;
-
-@Slf4j
-public abstract class BaseExtraSQLITCase extends BaseITCase {
-
- @Getter
- private final ExtraSQLCommand extraSQLCommand;
-
- public BaseExtraSQLITCase(final ScalingParameterized parameterized) {
- super(parameterized);
- extraSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(BaseExtraSQLITCase.class.getClassLoader().getResource(parameterized.getScenario())),
ExtraSQLCommand.class);
- }
-
- protected void createSourceOrderTable() throws SQLException {
- sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder());
- }
-
- protected void createSourceTableIndexList(final String schema) throws
SQLException {
- if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS
idx_user_id ON %s.t_order_copy ( user_id )", schema));
- } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON
%s.t_order_copy ( user_id )", schema));
- }
- }
-
- protected void createSourceCommentOnList(final String schema) throws
SQLException {
- sourceExecuteWithLog(String.format("COMMENT ON COLUMN
%s.t_order_copy.user_id IS 'user id'", schema));
- }
-
- protected void createSourceOrderItemTable() throws SQLException {
- sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
- }
-}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index d7b2acea72b..634fa89de25 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -21,18 +21,15 @@ import com.zaxxer.hikari.HikariDataSource;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import
org.apache.shardingsphere.integration.data.pipeline.cases.command.MigrationDistSQLCommand;
+import
org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
import
org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
-import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
+import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.integration.data.pipeline.framework.watcher.ScalingWatcher;
@@ -53,19 +50,13 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@Slf4j
@@ -76,8 +67,12 @@ public abstract class BaseITCase {
protected static final String SCHEMA_NAME = "test";
+ protected static final String PROXY_DATABASE = "sharding_db";
+
protected static final String DS_0 = "scaling_it_0";
+ protected static final String DS_1 = "scaling_it_1";
+
protected static final String DS_2 = "scaling_it_2";
protected static final String DS_3 = "scaling_it_3";
@@ -94,7 +89,7 @@ public abstract class BaseITCase {
private final BaseComposedContainer composedContainer;
- private final MigrationDistSQLCommand migrationDistSQLCommand;
+ private final ExtraSQLCommand extraSQLCommand;
private final DatabaseType databaseType;
@@ -111,14 +106,14 @@ public abstract class BaseITCase {
public BaseITCase(final ScalingParameterized parameterized) {
databaseType = parameterized.getDatabaseType();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
- composedContainer = new
MigrationComposedContainer(parameterized.getDatabaseType(),
parameterized.getDockerImageName());
+ if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
+ composedContainer = new
DockerComposedContainer(parameterized.getDatabaseType(),
parameterized.getDockerImageName());
} else {
composedContainer = new
NativeComposedContainer(parameterized.getDatabaseType());
}
composedContainer.start();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
- DockerStorageContainer storageContainer =
((MigrationComposedContainer) composedContainer).getStorageContainer();
+ if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
+ DockerStorageContainer storageContainer =
((DockerComposedContainer) composedContainer).getStorageContainer();
username = storageContainer.getUsername();
password = storageContainer.getUnifiedPassword();
} else {
@@ -126,10 +121,10 @@ public abstract class BaseITCase {
password = ENV.getActualDataSourcePassword(databaseType);
}
createProxyDatabase(parameterized.getDatabaseType());
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
cleanUpDataSource();
}
- migrationDistSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")),
MigrationDistSQLCommand.class);
+ extraSQLCommand =
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource(parameterized.getScenario())),
ExtraSQLCommand.class);
scalingWatcher = new ScalingWatcher(composedContainer);
}
@@ -146,19 +141,19 @@ public abstract class BaseITCase {
}
String jdbcUrl =
composedContainer.getProxyJdbcUrl(defaultDatabaseName);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
try {
- connectionExecuteWithLog(connection, "DROP DATABASE
sharding_db");
+ connectionExecuteWithLog(connection, String.format("DROP
DATABASE %s", PROXY_DATABASE));
} catch (final SQLException ex) {
- log.warn("Drop sharding_db failed, maybe it's not exist.
error msg={}", ex.getMessage());
+ log.warn("Drop proxy database failed, maybe it's not
exist. error msg={}", ex.getMessage());
}
}
- connectionExecuteWithLog(connection, "CREATE DATABASE
sharding_db");
+ connectionExecuteWithLog(connection, String.format("CREATE
DATABASE %s", PROXY_DATABASE));
} catch (final SQLException ex) {
throw new IllegalStateException(ex);
}
sourceDataSource = getDataSource(getActualJdbcUrlTemplate(DS_0,
false), username, password);
- proxyDataSource =
getDataSource(composedContainer.getProxyJdbcUrl("sharding_db"),
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
+ proxyDataSource =
getDataSource(composedContainer.getProxyJdbcUrl(PROXY_DATABASE),
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
}
private DataSource getDataSource(final String jdbcUrl, final String
username, final String password) {
@@ -172,42 +167,13 @@ public abstract class BaseITCase {
return result;
}
- @SneakyThrows(SQLException.class)
- protected void addSourceResource() {
- try (Connection connection =
DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl("sharding_db"),
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- addSourceResource0(connection);
- }
+ protected void addResource(final String distSQL) throws SQLException {
+ proxyExecuteWithLog(distSQL, 2);
}
- @SneakyThrows(SQLException.class)
- private void addSourceResource0(final Connection connection) {
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
- try {
- connectionExecuteWithLog(connection, "DROP MIGRATION SOURCE
RESOURCE ds_0");
- } catch (final SQLException ex) {
- log.warn("Drop sharding_db failed, maybe it's not exist. error
msg={}", ex.getMessage());
- }
- }
- String addSourceResource =
migrationDistSQLCommand.getAddMigrationSourceResourceTemplate().replace("${user}",
username)
- .replace("${password}", password)
- .replace("${ds0}", getActualJdbcUrlTemplate(DS_0, true));
- connectionExecuteWithLog(connection, addSourceResource);
- }
-
- protected void addTargetResource() throws SQLException {
- String addTargetResource =
migrationDistSQLCommand.getAddMigrationTargetResourceTemplate().replace("${user}",
username)
- .replace("${password}", password)
- .replace("${ds2}", getActualJdbcUrlTemplate(DS_2, true))
- .replace("${ds3}", getActualJdbcUrlTemplate(DS_3, true))
- .replace("${ds4}", getActualJdbcUrlTemplate(DS_4, true));
- proxyExecuteWithLog(addTargetResource, 2);
- List<Map<String, Object>> resources = queryForListWithLog("SHOW
DATABASE RESOURCES from sharding_db");
- assertThat(resources.size(), is(3));
- }
-
- private String getActualJdbcUrlTemplate(final String databaseName, final
boolean isInContainer) {
- if (ScalingITEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
- DockerStorageContainer storageContainer =
((MigrationComposedContainer) composedContainer).getStorageContainer();
+ protected String getActualJdbcUrlTemplate(final String databaseName, final
boolean isInContainer) {
+ if (ITEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
+ DockerStorageContainer storageContainer =
((DockerComposedContainer) composedContainer).getStorageContainer();
if (isInContainer) {
return DataSourceEnvironment.getURL(getDatabaseType(),
getDatabaseType().getType().toLowerCase() + ".host",
storageContainer.getPort(), databaseName);
} else {
@@ -217,48 +183,24 @@ public abstract class BaseITCase {
return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1",
ENV.getActualDataSourceDefaultPort(databaseType), databaseName);
}
- protected void createTargetOrderTableRule() throws SQLException {
-
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
- }
-
- protected void createTargetOrderTableEncryptRule() throws SQLException {
-
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(),
2);
+ protected void createSourceOrderTable() throws SQLException {
+ sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder());
}
- protected void createTargetOrderItemTableRule() throws SQLException {
-
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(),
2);
- }
-
- protected void startMigrationOrderCopy(final boolean withSchema) throws
SQLException {
- if (withSchema) {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTableWithSchema(),
1);
- } else {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTable(),
1);
+ protected void createSourceTableIndexList(final String schema) throws
SQLException {
+ if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
+ sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS
idx_user_id ON %s.t_order_copy ( user_id )", schema));
+ } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
+ sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON
%s.t_order_copy ( user_id )", schema));
}
}
- protected void startMigrationOrder() throws SQLException {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 1);
+ protected void createSourceCommentOnList(final String schema) throws
SQLException {
+ sourceExecuteWithLog(String.format("COMMENT ON COLUMN
%s.t_order_copy.user_id IS 'user id'", schema));
}
- protected void startMigrationOrderItem(final boolean withSchema) throws
SQLException {
- if (withSchema) {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
1);
- } else {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(),
1);
- }
- }
-
- protected void addMigrationProcessConfig() throws SQLException {
- try {
-
proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
- } catch (final SQLException ex) {
- if ("58000".equals(ex.getSQLState()) ||
"42000".equals(ex.getSQLState())) {
- log.warn(ex.getMessage());
- return;
- }
- throw ex;
- }
+ protected void createSourceOrderItemTable() throws SQLException {
+ sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
}
protected void createSourceSchema(final String schemaName) throws
SQLException {
@@ -336,90 +278,10 @@ public abstract class BaseITCase {
getIncreaseTaskThread().start();
}
- protected void stopMigrationByJobId(final String jobId) throws
SQLException {
- proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
- }
-
- // TODO reopen later
- protected void startMigrationByJobId(final String jobId) throws
SQLException {
- proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
- }
-
- protected void commitMigrationByJobId(final String jobId) throws
SQLException {
- proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
- }
-
- protected List<String> listJobId() {
- List<Map<String, Object>> jobList = queryForListWithLog("SHOW
MIGRATION LIST");
- return jobList.stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
- }
-
- protected String getJobIdByTableName(final String tableName) {
- List<Map<String, Object>> jobList = queryForListWithLog("SHOW
MIGRATION LIST");
- return jobList.stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new
RuntimeException("not find " + tableName + " table")).get("id").toString();
- }
-
- @SneakyThrows(InterruptedException.class)
- protected void waitMigrationFinished(final String jobId) {
- if (null != increaseTaskThread) {
- TimeUnit.SECONDS.timedJoin(increaseTaskThread, 60);
- }
- log.info("jobId: {}", jobId);
- Set<String> actualStatus;
- for (int i = 0; i < 10; i++) {
- List<Map<String, Object>> showScalingStatusResult =
showScalingStatus(jobId);
- log.info("show migration status result: {}",
showScalingStatusResult);
- actualStatus = showScalingStatusResult.stream().map(each ->
each.get("status").toString()).collect(Collectors.toSet());
- assertFalse(CollectionUtils.containsAny(actualStatus,
Arrays.asList(JobStatus.PREPARING_FAILURE.name(),
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
- JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
- if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
- break;
- } else if (actualStatus.size() >= 1 &&
actualStatus.containsAll(new HashSet<>(Arrays.asList("",
JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
- log.warn("one of the shardingItem was not started correctly");
- }
- ThreadUtil.sleep(3, TimeUnit.SECONDS);
- }
- }
-
protected void assertGreaterThanOrderTableInitRows(final int
tableInitRows, final String schema) throws SQLException {
proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
String countSQL = StringUtils.isBlank(schema) ? "SELECT COUNT(*) as
count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order",
schema);
Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
assertTrue("actual count " + actual.get("count"),
Integer.parseInt(actual.get("count").toString()) > tableInitRows);
}
-
- protected List<Map<String, Object>> showScalingStatus(final String jobId) {
- return queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
- }
-
- protected void assertCheckMigrationSuccess(final String jobId) {
- for (int i = 0; i < 10; i++) {
- if (checkJobIncrementTaskFinished(jobId)) {
- break;
- }
- ThreadUtil.sleep(3, TimeUnit.SECONDS);
- }
- boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
- log.info("second check job result: {}", secondCheckJobResult);
- List<Map<String, Object>> checkScalingResults =
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE
(NAME='DATA_MATCH')", jobId));
- log.info("checkScalingResults: {}", checkScalingResults);
- for (Map<String, Object> entry : checkScalingResults) {
-
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
- }
- }
-
- private boolean checkJobIncrementTaskFinished(final String jobId) {
- List<Map<String, Object>> listScalingStatus = showScalingStatus(jobId);
- log.info("listScalingStatus result: {}", listScalingStatus);
- for (Map<String, Object> entry : listScalingStatus) {
- if
(JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString()))
{
- return false;
- }
- int incrementalIdleSeconds =
Integer.parseInt(entry.get("incremental_idle_seconds").toString());
- if (incrementalIdleSeconds < 3) {
- return false;
- }
- }
- return true;
- }
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
index 6d0c0dddb0b..98cbe05e557 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/CreateTableSQLGeneratorIT.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.integration.data.pipeline.cases.entity.CreateTa
import
org.apache.shardingsphere.integration.data.pipeline.cases.entity.CreateTableSQLGeneratorAssertionsRootEntity;
import
org.apache.shardingsphere.integration.data.pipeline.cases.entity.CreateTableSQLGeneratorOutputEntity;
import
org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
-import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.test.integration.env.container.atomic.storage.DockerStorageContainer;
import
org.apache.shardingsphere.test.integration.env.container.atomic.storage.StorageContainerFactory;
@@ -87,7 +87,7 @@ public final class CreateTableSQLGeneratorIT {
@Parameters(name = "{0}")
public static Collection<ScalingParameterized> getParameters() {
Collection<ScalingParameterized> result = new LinkedList<>();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
return result;
}
for (String each : ENV.getPostgresVersions()) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index 9e78f363a47..4912720ffdf 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -21,9 +21,9 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseExtraSQLITCase;
+import
org.apache.shardingsphere.integration.data.pipeline.cases.base.AbstractMigrationITCase;
import
org.apache.shardingsphere.integration.data.pipeline.cases.task.MySQLIncrementTask;
-import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertThat;
*/
@Slf4j
@RunWith(Parameterized.class)
-public final class MySQLMigrationGeneralIT extends BaseExtraSQLITCase {
+public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
private final ScalingParameterized parameterized;
@@ -59,7 +59,7 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
@Parameters(name = "{0}")
public static Collection<ScalingParameterized> getParameters() {
Collection<ScalingParameterized> result = new LinkedList<>();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
return result;
}
MySQLDatabaseType databaseType = new MySQLDatabaseType();
@@ -75,8 +75,8 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
addMigrationProcessConfig();
createSourceOrderTable();
createSourceOrderItemTable();
- addSourceResource();
- addTargetResource();
+ addMigrationSourceResource();
+ addMigrationTargetResource();
createTargetOrderTableRule();
createTargetOrderTableEncryptRule();
createTargetOrderItemTableRule();
@@ -94,7 +94,7 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
String orderItemJobId = getJobIdByTableName("t_order_item");
assertMigrationSuccessById(orderJobId);
assertMigrationSuccessById(orderItemJobId);
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
for (String each : listJobId()) {
commitMigrationByJobId(each);
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index 38f9e6295fc..eb2a6c49c2b 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -22,9 +22,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
-import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseExtraSQLITCase;
+import
org.apache.shardingsphere.integration.data.pipeline.cases.base.AbstractMigrationITCase;
import
org.apache.shardingsphere.integration.data.pipeline.cases.task.PostgreSQLIncrementTask;
-import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
@@ -47,7 +47,7 @@ import static org.junit.Assert.assertThat;
*/
@Slf4j
@RunWith(Parameterized.class)
-public final class PostgreSQLMigrationGeneralIT extends BaseExtraSQLITCase {
+public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase {
private static final SnowflakeKeyGenerateAlgorithm KEY_GENERATE_ALGORITHM
= new SnowflakeKeyGenerateAlgorithm();
@@ -62,7 +62,7 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
@Parameters(name = "{0}")
public static Collection<ScalingParameterized> getParameters() {
Collection<ScalingParameterized> result = new LinkedList<>();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
return result;
}
for (String dockerImageName : ENV.listDatabaseDockerImageNames(new
PostgreSQLDatabaseType())) {
@@ -83,8 +83,8 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
createSourceOrderItemTable();
createSourceTableIndexList(SCHEMA_NAME);
createSourceCommentOnList(SCHEMA_NAME);
- addSourceResource();
- addTargetResource();
+ addMigrationSourceResource();
+ addMigrationTargetResource();
createTargetOrderTableRule();
createTargetOrderItemTableRule();
Pair<List<Object[]>, List<Object[]>> dataPair =
ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM,
parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
@@ -93,7 +93,7 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
checkOrderMigration(jdbcTemplate);
checkOrderItemMigration();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
for (String each : listJobId()) {
commitMigrationByJobId(each);
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index 8f747150a91..c800bcd3cff 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
-import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseExtraSQLITCase;
-import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.cases.base.AbstractMigrationITCase;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.junit.Test;
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class)
@Slf4j
-public class TextPrimaryKeyMigrationIT extends BaseExtraSQLITCase {
+public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
public TextPrimaryKeyMigrationIT(final ScalingParameterized parameterized)
{
super(parameterized);
@@ -53,7 +53,7 @@ public class TextPrimaryKeyMigrationIT extends
BaseExtraSQLITCase {
@Parameters(name = "{0}")
public static Collection<ScalingParameterized> getParameters() {
Collection<ScalingParameterized> result = new LinkedList<>();
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NONE) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NONE) {
return result;
}
for (String version : ENV.listDatabaseDockerImageNames(new
MySQLDatabaseType())) {
@@ -73,15 +73,15 @@ public class TextPrimaryKeyMigrationIT extends
BaseExtraSQLITCase {
createSourceOrderTable();
batchInsertOrder();
addMigrationProcessConfig();
- addSourceResource();
- addTargetResource();
+ addMigrationSourceResource();
+ addMigrationTargetResource();
createTargetOrderTableRule();
startMigrationOrder();
String jobId = listJobId().get(0);
waitMigrationFinished(jobId);
stopMigrationByJobId(jobId);
assertCheckMigrationSuccess(jobId);
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
+ if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
commitMigrationByJobId(jobId);
List<String> lastJobIds = listJobId();
assertThat(lastJobIds.size(), is(0));
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
index 869aff8b163..508df8f0b03 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
@@ -22,7 +22,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
+import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import java.io.IOException;
import java.io.InputStream;
@@ -40,7 +40,7 @@ public final class IntegrationTestEnvironment {
private final Properties props;
- private final ScalingITEnvTypeEnum itEnvType;
+ private final ITEnvTypeEnum itEnvType;
private final List<String> mysqlVersions;
@@ -50,7 +50,7 @@ public final class IntegrationTestEnvironment {
private IntegrationTestEnvironment() {
props = loadProperties();
- itEnvType =
ScalingITEnvTypeEnum.valueOf(StringUtils.defaultIfBlank(props.getProperty("scaling.it.env.type").toUpperCase(),
ScalingITEnvTypeEnum.NONE.name()));
+ itEnvType =
ITEnvTypeEnum.valueOf(StringUtils.defaultIfBlank(props.getProperty("scaling.it.env.type").toUpperCase(),
ITEnvTypeEnum.NONE.name()));
mysqlVersions =
Arrays.stream(props.getOrDefault("scaling.it.docker.mysql.version",
"").toString().split(",")).filter(StringUtils::isNotBlank).collect(Collectors.toList());
postgresVersions =
Arrays.stream(props.getOrDefault("scaling.it.docker.postgresql.version",
"").toString().split(",")).filter(StringUtils::isNotBlank).collect(Collectors.toList());
openGaussVersions =
Arrays.stream(props.getOrDefault("scaling.it.docker.opengauss.version",
"").toString().split(",")).filter(StringUtils::isNotBlank).collect(Collectors.toList());
@@ -152,7 +152,7 @@ public final class IntegrationTestEnvironment {
*/
public List<String> listDatabaseDockerImageNames(final DatabaseType
databaseType) {
// Native mode needn't use docker image, just return a list which
contain one item
- if (getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+ if (getItEnvType() == ITEnvTypeEnum.NATIVE) {
return
databaseType.getType().equalsIgnoreCase(getNativeDatabaseType()) ?
Collections.singletonList("") : Collections.emptyList();
}
switch (databaseType.getType()) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ScalingITEnvTypeEnum.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ITEnvTypeEnum.java
similarity index 96%
rename from
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ScalingITEnvTypeEnum.java
rename to
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ITEnvTypeEnum.java
index ab2b6da83c8..42ac83cabfa 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ScalingITEnvTypeEnum.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/enums/ITEnvTypeEnum.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.integration.data.pipeline.env.enums;
-public enum ScalingITEnvTypeEnum {
+public enum ITEnvTypeEnum {
NONE, DOCKER, NATIVE
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
similarity index 95%
rename from
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
rename to
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
index 95c505b9484..79cc4b7bc39 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
@@ -33,7 +33,7 @@ import
org.apache.shardingsphere.test.integration.env.runtime.DataSourceEnvironm
/**
* Composed container, include governance container and database container.
*/
-public final class MigrationComposedContainer extends BaseComposedContainer {
+public final class DockerComposedContainer extends BaseComposedContainer {
private final DatabaseType databaseType;
@@ -45,7 +45,7 @@ public final class MigrationComposedContainer extends
BaseComposedContainer {
@Getter
private final GovernanceContainer governanceContainer;
- public MigrationComposedContainer(final DatabaseType databaseType, final
String dockerImageName) {
+ public DockerComposedContainer(final DatabaseType databaseType, final
String dockerImageName) {
this.databaseType = databaseType;
governanceContainer = getContainers().registerContainer(new
ZookeeperContainer());
storageContainer =
getContainers().registerContainer((DockerStorageContainer)
StorageContainerFactory.newInstance(databaseType, dockerImageName,
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index a87773aad14..cdfb42db14e 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
+import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
import
org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -50,11 +50,11 @@ public class ScalingWatcher extends TestWatcher {
}
private void outputZookeeperData() {
- MigrationComposedContainer migrationComposedContainer =
(MigrationComposedContainer) composedContainer;
- DatabaseType databaseType =
migrationComposedContainer.getStorageContainer().getDatabaseType();
+ DockerComposedContainer dockerComposedContainer =
(DockerComposedContainer) composedContainer;
+ DatabaseType databaseType =
dockerComposedContainer.getStorageContainer().getDatabaseType();
String namespace = "it_db_" + databaseType.getType().toLowerCase();
ClusterPersistRepositoryConfiguration config = new
ClusterPersistRepositoryConfiguration("ZooKeeper", namespace,
-
migrationComposedContainer.getGovernanceContainer().getServerLists(), new
Properties());
+
dockerComposedContainer.getGovernanceContainer().getServerLists(), new
Properties());
ClusterPersistRepository zookeeperRepository = new
CuratorZookeeperRepository();
zookeeperRepository.init(config);
List<String> childrenKeys = zookeeperRepository.getChildrenKeys("/");