This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e238587479 Refactor 
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm (#23862)
0e238587479 is described below

commit 0e238587479463324975a5442be4e5b71840ee90
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jan 31 15:44:27 2023 +0800

    Refactor MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm (#23862)
    
    * Remove useless ProxyContext.getRules()
    
    * Refactor MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm
---
 ...licationDatabaseDiscoveryProviderAlgorithm.java | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git 
a/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
 
b/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
index d561182baf3..e0392edc455 100644
--- 
a/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
+++ 
b/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.dbdiscovery.mysql.type;
 
-import com.google.common.base.Strings;
 import lombok.Getter;
 import 
org.apache.shardingsphere.dbdiscovery.mysql.exception.replica.DuplicatePrimaryDataSourceException;
 import 
org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
@@ -37,11 +36,11 @@ import java.util.LinkedList;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 /**
  * Normal replication database discovery provider algorithm for MySQL.
  */
-@Getter
 public final class MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm 
implements DatabaseDiscoveryProviderAlgorithm {
     
     private static final String SHOW_SLAVE_STATUS = "SHOW SLAVE STATUS";
@@ -50,32 +49,36 @@ public final class 
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm impl
     
     private static final String SHOW_VARIABLES_READ_ONLY = "SHOW VARIABLES 
LIKE 'read_only'";
     
+    @Getter
     private Properties props;
     
+    private long delayMillisecondsThreshold;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        delayMillisecondsThreshold = 
Long.parseLong(props.getProperty("delay-milliseconds-threshold", "0"));
     }
     
     @Override
     public void checkEnvironment(final String databaseName, final 
Collection<DataSource> dataSources) {
         ExecutorService executorService = 
ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
-        Collection<CompletableFuture<Boolean>> completableFutures = new 
LinkedList<>();
-        for (DataSource dataSource : dataSources) {
-            completableFutures.add(supplyAsyncCheckEnvironment(dataSource, 
executorService));
-        }
+        checkPrimaryDataSource(databaseName, dataSources.stream().map(each -> 
asyncCheckEnvironment(executorService, each)).collect(Collectors.toList()));
+    }
+    
+    private void checkPrimaryDataSource(final String databaseName, final 
Collection<CompletableFuture<Boolean>> completableFutures) {
         CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[0]));
         Iterator<CompletableFuture<Boolean>> primaryInstancesFuture = 
completableFutures.stream().iterator();
-        int primaryCount = 0;
+        int primaryInstanceCount = 0;
         while (primaryInstancesFuture.hasNext()) {
             if (primaryInstancesFuture.next().join()) {
-                primaryCount++;
+                primaryInstanceCount++;
             }
         }
-        ShardingSpherePreconditions.checkState(1 == primaryCount, () -> new 
DuplicatePrimaryDataSourceException(databaseName));
+        ShardingSpherePreconditions.checkState(1 == primaryInstanceCount, () 
-> new DuplicatePrimaryDataSourceException(databaseName));
     }
     
-    private CompletableFuture<Boolean> supplyAsyncCheckEnvironment(final 
DataSource dataSource, final ExecutorService executorService) {
+    private CompletableFuture<Boolean> asyncCheckEnvironment(final 
ExecutorService executorService, final DataSource dataSource) {
         return CompletableFuture.supplyAsync(() -> {
             try {
                 return isPrimaryInstance(dataSource);
@@ -122,12 +125,11 @@ public final class 
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm impl
         try (
                 Connection connection = replicaDataSource.getConnection();
                 Statement statement = connection.createStatement()) {
-            String delayMillisecondsThreshold = 
getProps().getProperty("delay-milliseconds-threshold");
-            if (Strings.isNullOrEmpty(delayMillisecondsThreshold)) {
+            if (0L == delayMillisecondsThreshold) {
                 return new ReplicaDataSourceStatus(true, 0L);
             }
             long replicationDelayMilliseconds = 
queryReplicationDelayMilliseconds(statement);
-            boolean isDelay = replicationDelayMilliseconds >= 
Long.parseLong(delayMillisecondsThreshold);
+            boolean isDelay = replicationDelayMilliseconds >= 
delayMillisecondsThreshold;
             return new ReplicaDataSourceStatus(!isDelay, 
replicationDelayMilliseconds);
         }
     }

Reply via email to