This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6a2e287c4c6 Scaling IT add thread sleep, make sure metadata refreshed
(#18091)
6a2e287c4c6 is described below
commit 6a2e287c4c6128a42292f8238564761f669070d6
Author: azexcy <[email protected]>
AuthorDate: Tue May 31 20:03:45 2022 +0800
Scaling IT add thread sleep, make sure metadata refreshed (#18091)
---
.../data/pipeline/core/util/ThreadUtil.java | 17 +++++++++++-
.../data/pipeline/cases/base/BaseITCase.java | 30 +++++++++++++++++-----
2 files changed, 39 insertions(+), 8 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
index 7eadc0fff9c..d61e588804c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ThreadUtil.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import java.util.concurrent.TimeUnit;
+
/**
* Thread util.
*/
@@ -29,7 +31,7 @@ public final class ThreadUtil {
/**
* Sleep ignored InterruptedException.
*
- * @param millis sleep time.
+ * @param millis sleep time
*/
public static void sleep(final long millis) {
try {
@@ -37,4 +39,17 @@ public final class ThreadUtil {
} catch (final InterruptedException ignored) {
}
}
+
+ /**
+ * Sleep ignored InterruptedException.
+ *
+ * @param timeout timeout
+ * @param timeUnit time unit
+ */
+ public static void sleep(final int timeout, final TimeUnit timeUnit) {
+ try {
+ timeUnit.sleep(timeout);
+ } catch (final InterruptedException ignored) {
+ }
+ }
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 7ff3c000b96..817f52eb79c 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -25,6 +25,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -41,6 +42,7 @@ import
org.apache.shardingsphere.integration.data.pipeline.framework.watcher.Sca
import
org.apache.shardingsphere.integration.data.pipeline.util.DatabaseTypeUtil;
import org.apache.shardingsphere.test.integration.env.DataSourceEnvironment;
import org.junit.Rule;
+import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
@@ -107,7 +109,7 @@ public abstract class BaseITCase {
Properties queryProps =
ScalingCaseHelper.getQueryPropertiesByDatabaseType(databaseType);
String defaultDatabaseName =
DatabaseTypeUtil.isPostgreSQL(databaseType) ? "postgres" : "";
try (Connection connection =
DriverManager.getConnection(jdbcUrlAppender.appendQueryProperties(composedContainer.getProxyJdbcUrl(defaultDatabaseName),
queryProps), "root", "root")) {
- connection.createStatement().execute("CREATE DATABASE
sharding_db");
+ executeWithLog(connection, "CREATE DATABASE sharding_db");
}
jdbcTemplate = new JdbcTemplate(getProxyDataSource("sharding_db"));
}
@@ -162,7 +164,7 @@ public abstract class BaseITCase {
.replace("${password}",
ScalingCaseHelper.getPassword(databaseType))
.replace("${ds0}",
JDBC_URL_APPENDER.appendQueryProperties(getActualJdbcUrlTemplate("ds_0"),
queryProps))
.replace("${ds1}",
JDBC_URL_APPENDER.appendQueryProperties(getActualJdbcUrlTemplate("ds_1"),
queryProps));
- connection.createStatement().execute(addSourceResource);
+ executeWithLog(connection, addSourceResource);
}
@SneakyThrows
@@ -188,11 +190,9 @@ public abstract class BaseITCase {
}
}
- protected void initShardingAlgorithm() throws InterruptedException {
+ protected void initShardingAlgorithm() {
executeWithLog(getCommonSQLCommand().getCreateDatabaseShardingAlgorithm());
- TimeUnit.SECONDS.sleep(2);
executeWithLog(getCommonSQLCommand().getCreateOrderShardingAlgorithm());
- TimeUnit.SECONDS.sleep(2);
executeWithLog(getCommonSQLCommand().getCreateOrderItemShardingAlgorithm());
}
@@ -217,14 +217,30 @@ public abstract class BaseITCase {
executeWithLog(String.format("CREATE SCHEMA %s", schemaName));
}
+ protected void executeWithLog(final Connection connection, final String
sql) throws SQLException {
+ log.info("connection execute:{}", sql);
+ connection.createStatement().execute(sql);
+ ThreadUtil.sleep(1, TimeUnit.SECONDS);
+ }
+
protected void executeWithLog(final String sql) {
log.info("jdbcTemplate execute:{}", sql);
jdbcTemplate.execute(sql);
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
}
protected List<Map<String, Object>> queryForListWithLog(final String sql) {
- log.info("jdbcTemplate queryForMap:{}", sql);
- return jdbcTemplate.queryForList(sql);
+ int retryNumber = 0;
+ while (retryNumber <= 3) {
+ try {
+ return jdbcTemplate.queryForList(sql);
+ } catch (final DataAccessException ex) {
+ log.error("data access error", ex);
+ }
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
+ retryNumber++;
+ }
+ throw new RuntimeException("can't get result from proxy");
}
protected void startIncrementTask(final BaseIncrementTask
baseIncrementTask) {