This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7793139  Update scaling integration test (#10560)
7793139 is described below

commit 77931396dbd47284ada71bee25b995c94c49e6e0
Author: avalon5666 <[email protected]>
AuthorDate: Mon May 31 14:30:55 2021 +0800

    Update scaling integration test (#10560)
    
    * Update scaling integration test
    
    * Update pom
---
 shardingsphere-test/pom.xml                        |   1 +
 .../pom.xml                                        |   1 +
 .../integration/scaling/test/mysql/ScalingIT.java  |  43 +++++++-
 .../test/mysql/env/IntegrationTestEnvironment.java |  11 +--
 .../test/mysql/fixture/FixtureWriteThread.java     | 109 +++++++++++++++++++++
 .../scaling/test/mysql/util/ExecuteUtil.java       |  50 ++++++++++
 .../scaling/test/mysql/util/Executor.java          |  31 ++++++
 .../scaling/test/mysql/util/ScalingUtil.java       |  65 ++++++++----
 8 files changed, 280 insertions(+), 31 deletions(-)

diff --git a/shardingsphere-test/pom.xml b/shardingsphere-test/pom.xml
index 4679ab1..0d86360 100644
--- a/shardingsphere-test/pom.xml
+++ b/shardingsphere-test/pom.xml
@@ -33,6 +33,7 @@
         <module>shardingsphere-test-common</module>
         <module>shardingsphere-integration-test</module>
         <module>shardingsphere-integration-agent-test</module>
+        <module>shardingsphere-integration-scaling-test</module>
     </modules>
     
     <properties>
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml 
b/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
index 9fe3ab5..4ec5a71 100644
--- a/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
+++ b/shardingsphere-test/shardingsphere-integration-scaling-test/pom.xml
@@ -30,6 +30,7 @@
     <name>${project.artifactId}</name>
 
     <modules>
+        <module>shardingsphere-integration-scaling-test-mysql</module>
     </modules>
 
     <properties>
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
index 0f85046..3429a67 100644
--- 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
@@ -17,12 +17,14 @@
 
 package org.apache.shardingsphere.integration.scaling.test.mysql;
 
+import groovy.lang.Tuple2;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
+import 
org.apache.shardingsphere.integration.scaling.test.mysql.fixture.FixtureWriteThread;
+import 
org.apache.shardingsphere.integration.scaling.test.mysql.util.ExecuteUtil;
 import 
org.apache.shardingsphere.integration.scaling.test.mysql.util.ScalingUtil;
 import 
org.apache.shardingsphere.integration.scaling.test.mysql.util.TargetDataSourceUtil;
-import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
@@ -30,14 +32,47 @@ import static org.junit.Assert.assertTrue;
 @Slf4j
 public final class ScalingIT {
     
+    private static final long TIMEOUT_MS = 2 * 60 * 1000;
+    
+    private static final long WAIT_MS_BEFORE_START_JOB = 10 * 1000;
+    
+    private static final long WAIT_MS_BEFORE_CHECK_JOB = 15 * 1000;
+    
+    private final FixtureWriteThread fixtureWriteThread = new 
FixtureWriteThread(TIMEOUT_MS, 1000);
+    
     @SneakyThrows
     @Test
     public void assertScaling() {
         if (IntegrationTestEnvironment.getInstance().isEnvironmentPrepared()) {
             IntegrationTestEnvironment.getInstance().waitForEnvironmentReady();
-            String body = TargetDataSourceUtil.createDockerConfigurations();
-            ResponseContent<String> startResponse = 
ScalingUtil.getInstance().startJob(body);
-            assertTrue(startResponse.isSuccess());
+            fixtureWriteThread.start();
+            Thread.sleep(WAIT_MS_BEFORE_START_JOB);
+            String jobId = assertStartJob();
+            waitInventoryFinish(jobId);
+            fixtureWriteThread.stop();
+            Thread.sleep(WAIT_MS_BEFORE_CHECK_JOB);
+            assertJobCheck(jobId);
         }
     }
+    
+    @SneakyThrows
+    private String assertStartJob() {
+        String configurations = 
TargetDataSourceUtil.createDockerConfigurations();
+        Tuple2<Boolean, String> response = 
ScalingUtil.getInstance().startJob(configurations);
+        assertTrue(response.getFirst());
+        return response.getSecond();
+    }
+    
+    private void waitInventoryFinish(final String jobId) {
+        new ExecuteUtil(() -> {
+            return 
"EXECUTE_INCREMENTAL_TASK".equals(ScalingUtil.getInstance().getJobStatus(jobId));
+        }, (int) (TIMEOUT_MS - WAIT_MS_BEFORE_START_JOB) / (10 * 1000), 10 * 
1000).execute();
+    }
+    
+    @SneakyThrows
+    private void assertJobCheck(final String jobId) {
+        Tuple2<Boolean, Boolean> checkResult = 
ScalingUtil.getInstance().getJobCheckResult(jobId);
+        assertTrue(checkResult.getFirst());
+        assertTrue(checkResult.getSecond());
+    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
index 8cd46ec..224e8a5 100644
--- 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.integration.scaling.test.mysql.util.ScalingUtil;
+import 
org.apache.shardingsphere.integration.scaling.test.mysql.util.ExecuteUtil;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -57,14 +58,8 @@ public final class IntegrationTestEnvironment {
      */
     public void waitForEnvironmentReady() {
         log.info("wait begin scaling environment");
-        int retryCount = 0;
-        while (!isScalingReady(engineEnvProps) && retryCount < 
Integer.parseInt(engineEnvProps.getProperty("scaling.retry", "30"))) {
-            try {
-                
Thread.sleep(Long.parseLong(engineEnvProps.getProperty("scaling.waitMs", 
"1000")));
-            } catch (final InterruptedException ignore) {
-            }
-            retryCount++;
-        }
+        new ExecuteUtil(() -> isScalingReady(engineEnvProps), 
Integer.parseInt(engineEnvProps.getProperty("scaling.retry", "30")),
+                Long.parseLong(engineEnvProps.getProperty("scaling.waitMs", 
"1000"))).execute();
     }
     
     private boolean isScalingReady(final Properties engineEnvProps) {
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/fixture/FixtureWriteThread.java
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/fixture/FixtureWriteThread.java
new file mode 100644
index 0000000..946fc1d
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/fixture/FixtureWriteThread.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.scaling.test.mysql.fixture;
+
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.integration.scaling.test.mysql.util.SourceShardingSphereUtil;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+@RequiredArgsConstructor
+public final class FixtureWriteThread implements Runnable {
+    
+    private static final String INSERT_SQL = "INSERT INTO t1(c1, c2) VALUES(?, 
?)";
+    
+    private static final String UPDATE_SQL = "UPDATE t1 SET c2 = ? WHERE c1 = 
?";
+    
+    private static final String DELETE_SQL = "DELETE FROM t1 WHERE c1 = ?";
+    
+    private final long writeThreadTimeout;
+    
+    private final long writeSpawn;
+    
+    private boolean running;
+    
+    private Thread thread;
+    
+    /**
+     * Start.
+     */
+    public void start() {
+        running = true;
+        thread = new Thread(this);
+        thread.start();
+    }
+    
+    /**
+     * Stop.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void stop() throws InterruptedException {
+        running = false;
+        thread.interrupt();
+        thread.join();
+    }
+    
+    @SneakyThrows
+    @Override
+    public void run() {
+        long startTime = System.currentTimeMillis();
+        int idGenerator = 0;
+        DataSource dataSource = 
SourceShardingSphereUtil.createHostDataSource();
+        while (running && !checkTimeout(startTime, writeThreadTimeout)) {
+            try (Connection connection = dataSource.getConnection()) {
+                insert(connection, ++idGenerator);
+                update(connection, idGenerator);
+                insert(connection, ++idGenerator);
+                delete(connection, idGenerator);
+            }
+            try {
+                Thread.sleep(writeSpawn);
+            } catch (InterruptedException ignored) {
+            }
+        }
+    }
+    
+    private boolean checkTimeout(final long startTime, final long timeout) {
+        return timeout < System.currentTimeMillis() - startTime;
+    }
+    
+    private void insert(final Connection connection, final int id) throws 
SQLException {
+        PreparedStatement ps = connection.prepareStatement(INSERT_SQL);
+        ps.setInt(1, id);
+        ps.setString(2, Integer.toString(id));
+        ps.execute();
+    }
+    
+    private void update(final Connection connection, final int id) throws 
SQLException {
+        PreparedStatement ps = connection.prepareStatement(UPDATE_SQL);
+        ps.setString(1, Integer.toString(id + 1));
+        ps.setInt(2, id);
+        ps.execute();
+    }
+    
+    private void delete(final Connection connection, final int id) throws 
SQLException {
+        PreparedStatement ps = connection.prepareStatement(DELETE_SQL);
+        ps.setInt(1, id);
+        ps.execute();
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ExecuteUtil.java
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ExecuteUtil.java
new file mode 100644
index 0000000..a223f66
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ExecuteUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.scaling.test.mysql.util;
+
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Execute util.
+ */
+@RequiredArgsConstructor
+public final class ExecuteUtil {
+    
+    private final Executor executor;
+    
+    private final int retryCount;
+    
+    private final long waitMs;
+    
+    /**
+     * Execute.
+     *
+     * @return execute result
+     */
+    public boolean execute() {
+        int count = 0;
+        while (!executor.execute() && retryCount > count) {
+            try {
+                Thread.sleep(waitMs);
+            } catch (final InterruptedException ignored) {
+            }
+            count++;
+        }
+        return retryCount > count;
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/Executor.java
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/Executor.java
new file mode 100644
index 0000000..9c718c9
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/Executor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.scaling.test.mysql.util;
+
+/**
+ * Executor.
+ */
+public interface Executor {
+    
+    /**
+     * Execute.
+     *
+     * @return execute result
+     */
+    boolean execute();
+}
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
index bf4bd49..1c34494 100644
--- 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
@@ -17,18 +17,18 @@
 
 package org.apache.shardingsphere.integration.scaling.test.mysql.util;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import groovy.lang.Tuple2;
 import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
 import 
org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
-import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
 
 import java.io.IOException;
-import java.lang.reflect.Type;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertNotNull;
@@ -40,7 +40,7 @@ public final class ScalingUtil {
     
     private static final ScalingUtil OK_HTTP_UTILS = new ScalingUtil();
     
-    private static final Gson GSON = new Gson();
+    private static final JsonParser JSON_PARSER = new JsonParser();
     
     private final OkHttpClient client;
     
@@ -64,45 +64,72 @@ public final class ScalingUtil {
         return OK_HTTP_UTILS;
     }
     
-    private <T> T get(final String url, final Type type) throws IOException {
+    private JsonElement get(final String url) throws IOException {
         Request request = new Request.Builder().url(url).build();
         Response response = client.newCall(request).execute();
         assertNotNull(response.body());
         String result = response.body().string();
-        return GSON.fromJson(result, type);
+        return JSON_PARSER.parse(result);
     }
     
-    private <T> T post(final String url, final String body, final Type type) 
throws IOException {
+    private JsonElement post(final String url, final String body) throws 
IOException {
         RequestBody requestBody = 
RequestBody.create(MediaType.parse("application/json"), body);
         Request request = new 
Request.Builder().url(url).post(requestBody).build();
         Response response = client.newCall(request).execute();
         assertNotNull(response.body());
         String result = response.body().string();
-        return GSON.fromJson(result, type);
+        return JSON_PARSER.parse(result);
     }
     
     /**
      * Start job.
      *
      * @param configuration configuration
-     * @return response
+     * @return result
      * @throws IOException io exception
      */
-    public ResponseContent<String> startJob(final String configuration) throws 
IOException {
-        return getInstance().post(scalingUrl + "/scaling/job/start", 
configuration, new TypeToken<ResponseContent<String>>() {
-        
-        }.getType());
+    public Tuple2<Boolean, String> startJob(final String configuration) throws 
IOException {
+        JsonObject response = getInstance().post(scalingUrl + 
"/scaling/job/start", configuration).getAsJsonObject();
+        return new Tuple2<>(response.get("success").getAsBoolean(), 
response.get("model").getAsString());
+    }
+    
+    /**
+     * Get job status.
+     *
+     * @param jobId job id
+     * @return job status
+     */
+    public String getJobStatus(final String jobId) {
+        try {
+            JsonElement response = getInstance().get(scalingUrl + 
"/scaling/job/progress/" + jobId);
+            return 
response.getAsJsonObject().getAsJsonObject("model").getAsJsonObject("0").get("status").getAsString();
+            //CHECKSTYLE:OFF
+        } catch (Exception ignored) {
+            //CHECKSTYLE:ON
+        }
+        return null;
+    }
+    
+    /**
+     * Check job.
+     *
+     * @param jobId job id
+     * @return check result
+     * @throws IOException io exception
+     */
+    public Tuple2<Boolean, Boolean> getJobCheckResult(final String jobId) 
throws IOException {
+        JsonElement response = getInstance().get(scalingUrl + 
"/scaling/job/check/" + jobId);
+        JsonObject result = 
response.getAsJsonObject().getAsJsonObject("model").getAsJsonObject("t1");
+        return new Tuple2<>(result.get("countValid").getAsBoolean(), 
result.get("dataValid").getAsBoolean());
     }
     
     /**
      * Get job list.
      *
-     * @return job list
+     * @return result
      * @throws IOException io exception
      */
-    public ResponseContent<String> getJobList() throws IOException {
-        return getInstance().get(scalingUrl + "/scaling/job/list", new 
TypeToken<ResponseContent<Object[]>>() {
-        
-        }.getType());
+    public JsonElement getJobList() throws IOException {
+        return getInstance().get(scalingUrl + "/scaling/job/list");
     }
 }

Reply via email to