This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 73f420e38 remove offline server ip (#2251)
73f420e38 is described below

commit 73f420e38e622af0f0e88dc3be10d1da1dc7a341
Author: songxiaosheng <[email protected]>
AuthorDate: Mon Sep 4 08:14:08 2023 +0800

    remove offline server ip (#2251)
    
    * remove offline server ip
    
    * :memo: doc remove offline server
    
    * :memo: doc remove offline server
---
 .../lite/internal/server/ServerService.java        | 63 +++++++++++++++++-----
 .../lite/internal/setup/SetUpFacade.java           |  1 +
 2 files changed, 52 insertions(+), 12 deletions(-)

diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
index b2bb8a223..011e4c17c 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.internal.server;
 
 import com.google.common.base.Strings;
+import org.apache.commons.lang.StringUtils;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
@@ -25,27 +26,30 @@ import 
org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
 
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * Server service.
  */
 public final class ServerService {
-    
+
     private final String jobName;
-    
+
     private final JobNodeStorage jobNodeStorage;
-    
+
     private final ServerNode serverNode;
-    
+
     public ServerService(final CoordinatorRegistryCenter regCenter, final 
String jobName) {
         this.jobName = jobName;
         jobNodeStorage = new JobNodeStorage(regCenter, jobName);
         serverNode = new ServerNode(jobName);
     }
-    
+
     /**
      * Persist online status of job server.
-     * 
+     *
      * @param enabled enable server or not
      */
     public void persistOnline(final boolean enabled) {
@@ -53,10 +57,10 @@ public final class ServerService {
             
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()),
 enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
         }
     }
-    
+
     /**
      * Judge has available servers or not.
-     * 
+     *
      * @return has available servers or not
      */
     public boolean hasAvailableServers() {
@@ -68,17 +72,17 @@ public final class ServerService {
         }
         return false;
     }
-    
+
     /**
      * Judge is available server or not.
-     * 
+     *
      * @param ip job server IP address
      * @return is available server or not
      */
     public boolean isAvailableServer(final String ip) {
         return isEnableServer(ip) && hasOnlineInstances(ip);
     }
-    
+
     private boolean hasOnlineInstances(final String ip) {
         for (String each : 
jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
             if (each.startsWith(ip)) {
@@ -87,7 +91,7 @@ public final class ServerService {
         }
         return false;
     }
-    
+
     /**
      * Judge is server enabled or not.
      *
@@ -102,4 +106,39 @@ public final class ServerService {
         }
         return ServerStatus.ENABLED.name().equals(serverStatus);
     }
+
+    /**
+     *  Remove unuse serverIp.
+     * @return num of serverIp be remove
+     */
+    public int removeOfflineServers() {
+        AtomicInteger affectNums = new AtomicInteger();
+        List<String> instances = 
jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT);
+        if (instances == null || instances.isEmpty()) {
+            return affectNums.get();
+        }
+        Set<String> instanceIps = instances.stream()
+                .map(instance -> instance.split("@-@")[0])
+                .collect(Collectors.toSet());
+        if (instanceIps == null || instanceIps.isEmpty()) {
+            return affectNums.get();
+        }
+        List<String> serverIps = 
jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT);
+        if (serverIps == null || serverIps.isEmpty()) {
+            return affectNums.get();
+        }
+
+        serverIps.forEach(serverIp -> {
+            if (instanceIps.contains(serverIp)) {
+                return;
+            }
+            String status = 
jobNodeStorage.getJobNodeData(serverNode.getServerNode(serverIp));
+            if (StringUtils.isNotBlank(status)) {
+                return;
+            }
+            
jobNodeStorage.removeJobNodeIfExisted(serverNode.getServerNode(serverIp));
+            affectNums.getAndIncrement();
+        });
+        return affectNums.get();
+    }
 }
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
index f0d3fa83b..da154ec1e 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
@@ -75,6 +75,7 @@ public final class SetUpFacade {
         if (!reconcileService.isRunning()) {
             reconcileService.startAsync();
         }
+        serverService.removeOfflineServers();
     }
     
     /**

Reply via email to