This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7793139 Update scaling integration test (#10560)
7793139 is described below
commit 77931396dbd47284ada71bee25b995c94c49e6e0
Author: avalon5666 <[email protected]>
AuthorDate: Mon May 31 14:30:55 2021 +0800
Update scaling integration test (#10560)
* Update scaling integration test
* Update pom
---
shardingsphere-test/pom.xml | 1 +
.../pom.xml | 1 +
.../integration/scaling/test/mysql/ScalingIT.java | 43 +++++++-
.../test/mysql/env/IntegrationTestEnvironment.java | 11 +--
.../test/mysql/fixture/FixtureWriteThread.java | 109 +++++++++++++++++++++
.../scaling/test/mysql/util/ExecuteUtil.java | 50 ++++++++++
.../scaling/test/mysql/util/Executor.java | 31 ++++++
.../scaling/test/mysql/util/ScalingUtil.java | 65 ++++++++----
8 files changed, 280 insertions(+), 31 deletions(-)
diff --git a/shardingsphere-test/pom.xml b/shardingsphere-test/pom.xml
index 4679ab1..0d86360 100644
--- a/shardingsphere-test/pom.xml
+++ b/shardingsphere-test/pom.xml
@@ -33,6 +33,7 @@
<module>shardingsphere-test-common</module>
<module>shardingsphere-integration-test</module>
<module>shardingsphere-integration-agent-test</module>
+ <module>shardingsphere-integration-scaling-test</module>
</modules>
<properties>
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
b/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
index 9fe3ab5..4ec5a71 100644
--- a/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
+++ b/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
@@ -30,6 +30,7 @@
<name>${project.artifactId}</name>
<modules>
+ <module>shardingsphere-integration-scaling-test-mysql</module>
</modules>
<properties>
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
index 0f85046..3429a67 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
@@ -17,12 +17,14 @@
package org.apache.shardingsphere.integration.scaling.test.mysql;
+import groovy.lang.Tuple2;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
+import
org.apache.shardingsphere.integration.scaling.test.mysql.fixture.FixtureWriteThread;
+import
org.apache.shardingsphere.integration.scaling.test.mysql.util.ExecuteUtil;
import
org.apache.shardingsphere.integration.scaling.test.mysql.util.ScalingUtil;
import
org.apache.shardingsphere.integration.scaling.test.mysql.util.TargetDataSourceUtil;
-import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
@@ -30,14 +32,47 @@ import static org.junit.Assert.assertTrue;
@Slf4j
public final class ScalingIT {
+ private static final long TIMEOUT_MS = 2 * 60 * 1000;
+
+ private static final long WAIT_MS_BEFORE_START_JOB = 10 * 1000;
+
+ private static final long WAIT_MS_BEFORE_CHECK_JOB = 15 * 1000;
+
+ private final FixtureWriteThread fixtureWriteThread = new
FixtureWriteThread(TIMEOUT_MS, 1000);
+
@SneakyThrows
@Test
public void assertScaling() {
if (IntegrationTestEnvironment.getInstance().isEnvironmentPrepared()) {
IntegrationTestEnvironment.getInstance().waitForEnvironmentReady();
- String body = TargetDataSourceUtil.createDockerConfigurations();
- ResponseContent<String> startResponse =
ScalingUtil.getInstance().startJob(body);
- assertTrue(startResponse.isSuccess());
+ fixtureWriteThread.start();
+ Thread.sleep(WAIT_MS_BEFORE_START_JOB);
+ String jobId = assertStartJob();
+ waitInventoryFinish(jobId);
+ fixtureWriteThread.stop();
+ Thread.sleep(WAIT_MS_BEFORE_CHECK_JOB);
+ assertJobCheck(jobId);
}
}
+
+ @SneakyThrows
+ private String assertStartJob() {
+ String configurations =
TargetDataSourceUtil.createDockerConfigurations();
+ Tuple2<Boolean, String> response =
ScalingUtil.getInstance().startJob(configurations);
+ assertTrue(response.getFirst());
+ return response.getSecond();
+ }
+
+ private void waitInventoryFinish(final String jobId) {
+ new ExecuteUtil(() -> {
+ return
"EXECUTE_INCREMENTAL_TASK".equals(ScalingUtil.getInstance().getJobStatus(jobId));
+ }, (int) (TIMEOUT_MS - WAIT_MS_BEFORE_START_JOB) / (10 * 1000), 10 *
1000).execute();
+ }
+
+ @SneakyThrows
+ private void assertJobCheck(final String jobId) {
+ Tuple2<Boolean, Boolean> checkResult =
ScalingUtil.getInstance().getJobCheckResult(jobId);
+ assertTrue(checkResult.getFirst());
+ assertTrue(checkResult.getSecond());
+ }
}
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
index 8cd46ec..224e8a5 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.integration.scaling.test.mysql.util.ScalingUtil;
+import
org.apache.shardingsphere.integration.scaling.test.mysql.util.ExecuteUtil;
import java.io.IOException;
import java.io.InputStream;
@@ -57,14 +58,8 @@ public final class IntegrationTestEnvironment {
*/
public void waitForEnvironmentReady() {
log.info("wait begin scaling environment");
- int retryCount = 0;
- while (!isScalingReady(engineEnvProps) && retryCount <
Integer.parseInt(engineEnvProps.getProperty("scaling.retry", "30"))) {
- try {
-
Thread.sleep(Long.parseLong(engineEnvProps.getProperty("scaling.waitMs",
"1000")));
- } catch (final InterruptedException ignore) {
- }
- retryCount++;
- }
+ new ExecuteUtil(() -> isScalingReady(engineEnvProps),
Integer.parseInt(engineEnvProps.getProperty("scaling.retry", "30")),
+ Long.parseLong(engineEnvProps.getProperty("scaling.waitMs",
"1000"))).execute();
}
private boolean isScalingReady(final Properties engineEnvProps) {
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/fixture/FixtureWriteThread.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/fixture/FixtureWriteThread.java
new file mode 100644
index 0000000..946fc1d
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/fixture/FixtureWriteThread.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.scaling.test.mysql.fixture;
+
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import
org.apache.shardingsphere.integration.scaling.test.mysql.util.SourceShardingSphereUtil;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+@RequiredArgsConstructor
+public final class FixtureWriteThread implements Runnable {
+
+ private static final String INSERT_SQL = "INSERT INTO t1(c1, c2) VALUES(?,
?)";
+
+ private static final String UPDATE_SQL = "UPDATE t1 SET c2 = ? WHERE c1 =
?";
+
+ private static final String DELETE_SQL = "DELETE FROM t1 WHERE c1 = ?";
+
+ private final long writeThreadTimeout;
+
+ private final long writeSpawn;
+
+ private boolean running;
+
+ private Thread thread;
+
+ /**
+ * Start.
+ */
+ public void start() {
+ running = true;
+ thread = new Thread(this);
+ thread.start();
+ }
+
+ /**
+ * Stop.
+ *
+ * @throws InterruptedException interrupted exception
+ */
+ public void stop() throws InterruptedException {
+ running = false;
+ thread.interrupt();
+ thread.join();
+ }
+
+ @SneakyThrows
+ @Override
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ int idGenerator = 0;
+ DataSource dataSource =
SourceShardingSphereUtil.createHostDataSource();
+ while (running && !checkTimeout(startTime, writeThreadTimeout)) {
+ try (Connection connection = dataSource.getConnection()) {
+ insert(connection, ++idGenerator);
+ update(connection, idGenerator);
+ insert(connection, ++idGenerator);
+ delete(connection, idGenerator);
+ }
+ try {
+ Thread.sleep(writeSpawn);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ private boolean checkTimeout(final long startTime, final long timeout) {
+ return timeout < System.currentTimeMillis() - startTime;
+ }
+
+ private void insert(final Connection connection, final int id) throws
SQLException {
+ PreparedStatement ps = connection.prepareStatement(INSERT_SQL);
+ ps.setInt(1, id);
+ ps.setString(2, Integer.toString(id));
+ ps.execute();
+ }
+
+ private void update(final Connection connection, final int id) throws
SQLException {
+ PreparedStatement ps = connection.prepareStatement(UPDATE_SQL);
+ ps.setString(1, Integer.toString(id + 1));
+ ps.setInt(2, id);
+ ps.execute();
+ }
+
+ private void delete(final Connection connection, final int id) throws
SQLException {
+ PreparedStatement ps = connection.prepareStatement(DELETE_SQL);
+ ps.setInt(1, id);
+ ps.execute();
+ }
+}
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ExecuteUtil.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ExecuteUtil.java
new file mode 100644
index 0000000..a223f66
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ExecuteUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.scaling.test.mysql.util;
+
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Execute util.
+ */
+@RequiredArgsConstructor
+public final class ExecuteUtil {
+
+ private final Executor executor;
+
+ private final int retryCount;
+
+ private final long waitMs;
+
+ /**
+ * Execute.
+ *
+ * @return execute result
+ */
+ public boolean execute() {
+ int count = 0;
+ while (!executor.execute() && retryCount > count) {
+ try {
+ Thread.sleep(waitMs);
+ } catch (final InterruptedException ignored) {
+ }
+ count++;
+ }
+ return retryCount > count;
+ }
+}
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/Executor.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/Executor.java
new file mode 100644
index 0000000..9c718c9
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/Executor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.scaling.test.mysql.util;
+
+/**
+ * Executor.
+ */
+public interface Executor {
+
+ /**
+ * Execute.
+ *
+ * @return execute result
+ */
+ boolean execute();
+}
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
index bf4bd49..1c34494 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
@@ -17,18 +17,18 @@
package org.apache.shardingsphere.integration.scaling.test.mysql.util;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import groovy.lang.Tuple2;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import
org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
-import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
import java.io.IOException;
-import java.lang.reflect.Type;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotNull;
@@ -40,7 +40,7 @@ public final class ScalingUtil {
private static final ScalingUtil OK_HTTP_UTILS = new ScalingUtil();
- private static final Gson GSON = new Gson();
+ private static final JsonParser JSON_PARSER = new JsonParser();
private final OkHttpClient client;
@@ -64,45 +64,72 @@ public final class ScalingUtil {
return OK_HTTP_UTILS;
}
- private <T> T get(final String url, final Type type) throws IOException {
+ private JsonElement get(final String url) throws IOException {
Request request = new Request.Builder().url(url).build();
Response response = client.newCall(request).execute();
assertNotNull(response.body());
String result = response.body().string();
- return GSON.fromJson(result, type);
+ return JSON_PARSER.parse(result);
}
- private <T> T post(final String url, final String body, final Type type)
throws IOException {
+ private JsonElement post(final String url, final String body) throws
IOException {
RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), body);
Request request = new
Request.Builder().url(url).post(requestBody).build();
Response response = client.newCall(request).execute();
assertNotNull(response.body());
String result = response.body().string();
- return GSON.fromJson(result, type);
+ return JSON_PARSER.parse(result);
}
/**
* Start job.
*
* @param configuration configuration
- * @return response
+ * @return result
* @throws IOException io exception
*/
- public ResponseContent<String> startJob(final String configuration) throws
IOException {
- return getInstance().post(scalingUrl + "/scaling/job/start",
configuration, new TypeToken<ResponseContent<String>>() {
-
- }.getType());
+ public Tuple2<Boolean, String> startJob(final String configuration) throws
IOException {
+ JsonObject response = getInstance().post(scalingUrl +
"/scaling/job/start", configuration).getAsJsonObject();
+ return new Tuple2<>(response.get("success").getAsBoolean(),
response.get("model").getAsString());
+ }
+
+ /**
+ * Get job status.
+ *
+ * @param jobId job id
+ * @return job status
+ */
+ public String getJobStatus(final String jobId) {
+ try {
+ JsonElement response = getInstance().get(scalingUrl +
"/scaling/job/progress/" + jobId);
+ return
response.getAsJsonObject().getAsJsonObject("model").getAsJsonObject("0").get("status").getAsString();
+ //CHECKSTYLE:OFF
+ } catch (Exception ignored) {
+ //CHECKSTYLE:ON
+ }
+ return null;
+ }
+
+ /**
+ * Check job.
+ *
+ * @param jobId job id
+ * @return check result
+ * @throws IOException io exception
+ */
+ public Tuple2<Boolean, Boolean> getJobCheckResult(final String jobId)
throws IOException {
+ JsonElement response = getInstance().get(scalingUrl +
"/scaling/job/check/" + jobId);
+ JsonObject result =
response.getAsJsonObject().getAsJsonObject("model").getAsJsonObject("t1");
+ return new Tuple2<>(result.get("countValid").getAsBoolean(),
result.get("dataValid").getAsBoolean());
}
/**
* Get job list.
*
- * @return job list
+ * @return result
* @throws IOException io exception
*/
- public ResponseContent<String> getJobList() throws IOException {
- return getInstance().get(scalingUrl + "/scaling/job/list", new
TypeToken<ResponseContent<Object[]>>() {
-
- }.getType());
+ public JsonElement getJobList() throws IOException {
+ return getInstance().get(scalingUrl + "/scaling/job/list");
}
}