This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a2e287c4c6 Scaling IT add thread sleep, make sure metadata refreshed 
(#18091)
6a2e287c4c6 is described below

commit 6a2e287c4c6128a42292f8238564761f669070d6
Author: azexcy <[email protected]>
AuthorDate: Tue May 31 20:03:45 2022 +0800

    Scaling IT add thread sleep, make sure metadata refreshed (#18091)
---
 .../data/pipeline/core/util/ThreadUtil.java        | 17 +++++++++++-
 .../data/pipeline/cases/base/BaseITCase.java       | 30 +++++++++++++++++-----
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
index 7eadc0fff9c..d61e588804c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Thread util.
  */
@@ -29,7 +31,7 @@ public final class ThreadUtil {
     /**
      * Sleep ignored InterruptedException.
      *
-     * @param millis sleep time.
+     * @param millis sleep time
      */
     public static void sleep(final long millis) {
         try {
@@ -37,4 +39,17 @@ public final class ThreadUtil {
         } catch (final InterruptedException ignored) {
         }
     }
+    
+    /**
+     * Sleep ignored InterruptedException.
+     *
+     * @param timeout timeout
+     * @param timeUnit time unit
+     */
+    public static void sleep(final int timeout, final TimeUnit timeUnit) {
+        try {
+            timeUnit.sleep(timeout);
+        } catch (final InterruptedException ignored) {
+        }
+    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 7ff3c000b96..817f52eb79c 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -25,6 +25,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -41,6 +42,7 @@ import 
org.apache.shardingsphere.integration.data.pipeline.framework.watcher.Sca
 import 
org.apache.shardingsphere.integration.data.pipeline.util.DatabaseTypeUtil;
 import org.apache.shardingsphere.test.integration.env.DataSourceEnvironment;
 import org.junit.Rule;
+import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import javax.sql.DataSource;
@@ -107,7 +109,7 @@ public abstract class BaseITCase {
         Properties queryProps = 
ScalingCaseHelper.getQueryPropertiesByDatabaseType(databaseType);
         String defaultDatabaseName = 
DatabaseTypeUtil.isPostgreSQL(databaseType) ? "postgres" : "";
         try (Connection connection = 
DriverManager.getConnection(jdbcUrlAppender.appendQueryProperties(composedContainer.getProxyJdbcUrl(defaultDatabaseName),
 queryProps), "root", "root")) {
-            connection.createStatement().execute("CREATE DATABASE 
sharding_db");
+            executeWithLog(connection, "CREATE DATABASE sharding_db");
         }
         jdbcTemplate = new JdbcTemplate(getProxyDataSource("sharding_db"));
     }
@@ -162,7 +164,7 @@ public abstract class BaseITCase {
                 .replace("${password}", 
ScalingCaseHelper.getPassword(databaseType))
                 .replace("${ds0}", 
JDBC_URL_APPENDER.appendQueryProperties(getActualJdbcUrlTemplate("ds_0"), 
queryProps))
                 .replace("${ds1}", 
JDBC_URL_APPENDER.appendQueryProperties(getActualJdbcUrlTemplate("ds_1"), 
queryProps));
-        connection.createStatement().execute(addSourceResource);
+        executeWithLog(connection, addSourceResource);
     }
     
     @SneakyThrows
@@ -188,11 +190,9 @@ public abstract class BaseITCase {
         }
     }
     
-    protected void initShardingAlgorithm() throws InterruptedException {
+    protected void initShardingAlgorithm() {
         
executeWithLog(getCommonSQLCommand().getCreateDatabaseShardingAlgorithm());
-        TimeUnit.SECONDS.sleep(2);
         
executeWithLog(getCommonSQLCommand().getCreateOrderShardingAlgorithm());
-        TimeUnit.SECONDS.sleep(2);
         
executeWithLog(getCommonSQLCommand().getCreateOrderItemShardingAlgorithm());
     }
     
@@ -217,14 +217,30 @@ public abstract class BaseITCase {
         executeWithLog(String.format("CREATE SCHEMA %s", schemaName));
     }
     
+    protected void executeWithLog(final Connection connection, final String 
sql) throws SQLException {
+        log.info("connection execute:{}", sql);
+        connection.createStatement().execute(sql);
+        ThreadUtil.sleep(1, TimeUnit.SECONDS);
+    }
+    
     protected void executeWithLog(final String sql) {
         log.info("jdbcTemplate execute:{}", sql);
         jdbcTemplate.execute(sql);
+        ThreadUtil.sleep(2, TimeUnit.SECONDS);
     }
     
     protected List<Map<String, Object>> queryForListWithLog(final String sql) {
-        log.info("jdbcTemplate queryForMap:{}", sql);
-        return jdbcTemplate.queryForList(sql);
+        int retryNumber = 0;
+        while (retryNumber <= 3) {
+            try {
+                return jdbcTemplate.queryForList(sql);
+            } catch (final DataAccessException ex) {
+                log.error("data access error", ex);
+            }
+            ThreadUtil.sleep(2, TimeUnit.SECONDS);
+            retryNumber++;
+        }
+        throw new RuntimeException("can't get result from proxy");
     }
     
     protected void startIncrementTask(final BaseIncrementTask 
baseIncrementTask) {

Reply via email to