This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0e238587479 Refactor
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm (#23862)
0e238587479 is described below
commit 0e238587479463324975a5442be4e5b71840ee90
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jan 31 15:44:27 2023 +0800
Refactor MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm (#23862)
* Remove useless ProxyContext.getRules()
* Refactor MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm
---
...licationDatabaseDiscoveryProviderAlgorithm.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git
a/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
b/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
index d561182baf3..e0392edc455 100644
---
a/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
+++
b/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.dbdiscovery.mysql.type;
-import com.google.common.base.Strings;
import lombok.Getter;
import
org.apache.shardingsphere.dbdiscovery.mysql.exception.replica.DuplicatePrimaryDataSourceException;
import
org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
@@ -37,11 +36,11 @@ import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
/**
* Normal replication database discovery provider algorithm for MySQL.
*/
-@Getter
public final class MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm
implements DatabaseDiscoveryProviderAlgorithm {
private static final String SHOW_SLAVE_STATUS = "SHOW SLAVE STATUS";
@@ -50,32 +49,36 @@ public final class
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm impl
private static final String SHOW_VARIABLES_READ_ONLY = "SHOW VARIABLES
LIKE 'read_only'";
+ @Getter
private Properties props;
+ private long delayMillisecondsThreshold;
+
@Override
public void init(final Properties props) {
this.props = props;
+ delayMillisecondsThreshold =
Long.parseLong(props.getProperty("delay-milliseconds-threshold", "0"));
}
@Override
public void checkEnvironment(final String databaseName, final
Collection<DataSource> dataSources) {
ExecutorService executorService =
ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
- Collection<CompletableFuture<Boolean>> completableFutures = new
LinkedList<>();
- for (DataSource dataSource : dataSources) {
- completableFutures.add(supplyAsyncCheckEnvironment(dataSource,
executorService));
- }
+ checkPrimaryDataSource(databaseName, dataSources.stream().map(each ->
asyncCheckEnvironment(executorService, each)).collect(Collectors.toList()));
+ }
+
+ private void checkPrimaryDataSource(final String databaseName, final
Collection<CompletableFuture<Boolean>> completableFutures) {
CompletableFuture.allOf(completableFutures.toArray(new
CompletableFuture[0]));
Iterator<CompletableFuture<Boolean>> primaryInstancesFuture =
completableFutures.stream().iterator();
- int primaryCount = 0;
+ int primaryInstanceCount = 0;
while (primaryInstancesFuture.hasNext()) {
if (primaryInstancesFuture.next().join()) {
- primaryCount++;
+ primaryInstanceCount++;
}
}
- ShardingSpherePreconditions.checkState(1 == primaryCount, () -> new
DuplicatePrimaryDataSourceException(databaseName));
+ ShardingSpherePreconditions.checkState(1 == primaryInstanceCount, ()
-> new DuplicatePrimaryDataSourceException(databaseName));
}
- private CompletableFuture<Boolean> supplyAsyncCheckEnvironment(final
DataSource dataSource, final ExecutorService executorService) {
+ private CompletableFuture<Boolean> asyncCheckEnvironment(final
ExecutorService executorService, final DataSource dataSource) {
return CompletableFuture.supplyAsync(() -> {
try {
return isPrimaryInstance(dataSource);
@@ -122,12 +125,11 @@ public final class
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm impl
try (
Connection connection = replicaDataSource.getConnection();
Statement statement = connection.createStatement()) {
- String delayMillisecondsThreshold =
getProps().getProperty("delay-milliseconds-threshold");
- if (Strings.isNullOrEmpty(delayMillisecondsThreshold)) {
+ if (0L == delayMillisecondsThreshold) {
return new ReplicaDataSourceStatus(true, 0L);
}
long replicationDelayMilliseconds =
queryReplicationDelayMilliseconds(statement);
- boolean isDelay = replicationDelayMilliseconds >=
Long.parseLong(delayMillisecondsThreshold);
+ boolean isDelay = replicationDelayMilliseconds >=
delayMillisecondsThreshold;
return new ReplicaDataSourceStatus(!isDelay,
replicationDelayMilliseconds);
}
}