This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 73f420e38 remove offline server ip (#2251)
73f420e38 is described below
commit 73f420e38e622af0f0e88dc3be10d1da1dc7a341
Author: songxiaosheng <[email protected]>
AuthorDate: Mon Sep 4 08:14:08 2023 +0800
remove offline server ip (#2251)
* remove offline server ip
* :memo: doc remove offline server
* :memo: doc remove offline server
---
.../lite/internal/server/ServerService.java | 63 +++++++++++++++++-----
.../lite/internal/setup/SetUpFacade.java | 1 +
2 files changed, 52 insertions(+), 12 deletions(-)
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
index b2bb8a223..011e4c17c 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.server;
import com.google.common.base.Strings;
+import org.apache.commons.lang.StringUtils;
import
org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import
org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
@@ -25,27 +26,30 @@ import
org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
/**
* Server service.
*/
public final class ServerService {
-
+
private final String jobName;
-
+
private final JobNodeStorage jobNodeStorage;
-
+
private final ServerNode serverNode;
-
+
public ServerService(final CoordinatorRegistryCenter regCenter, final
String jobName) {
this.jobName = jobName;
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
serverNode = new ServerNode(jobName);
}
-
+
/**
* Persist online status of job server.
- *
+ *
* @param enabled enable server or not
*/
public void persistOnline(final boolean enabled) {
@@ -53,10 +57,10 @@ public final class ServerService {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()),
enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
}
}
-
+
/**
* Judge has available servers or not.
- *
+ *
* @return has available servers or not
*/
public boolean hasAvailableServers() {
@@ -68,17 +72,17 @@ public final class ServerService {
}
return false;
}
-
+
/**
* Judge is available server or not.
- *
+ *
* @param ip job server IP address
* @return is available server or not
*/
public boolean isAvailableServer(final String ip) {
return isEnableServer(ip) && hasOnlineInstances(ip);
}
-
+
private boolean hasOnlineInstances(final String ip) {
for (String each :
jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
if (each.startsWith(ip)) {
@@ -87,7 +91,7 @@ public final class ServerService {
}
return false;
}
-
+
/**
* Judge is server enabled or not.
*
@@ -102,4 +106,39 @@ public final class ServerService {
}
return ServerStatus.ENABLED.name().equals(serverStatus);
}
+
+ /**
+ * Remove unuse serverIp.
+ * @return num of serverIp be remove
+ */
+ public int removeOfflineServers() {
+ AtomicInteger affectNums = new AtomicInteger();
+ List<String> instances =
jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT);
+ if (instances == null || instances.isEmpty()) {
+ return affectNums.get();
+ }
+ Set<String> instanceIps = instances.stream()
+ .map(instance -> instance.split("@-@")[0])
+ .collect(Collectors.toSet());
+ if (instanceIps == null || instanceIps.isEmpty()) {
+ return affectNums.get();
+ }
+ List<String> serverIps =
jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT);
+ if (serverIps == null || serverIps.isEmpty()) {
+ return affectNums.get();
+ }
+
+ serverIps.forEach(serverIp -> {
+ if (instanceIps.contains(serverIp)) {
+ return;
+ }
+ String status =
jobNodeStorage.getJobNodeData(serverNode.getServerNode(serverIp));
+ if (StringUtils.isNotBlank(status)) {
+ return;
+ }
+
jobNodeStorage.removeJobNodeIfExisted(serverNode.getServerNode(serverIp));
+ affectNums.getAndIncrement();
+ });
+ return affectNums.get();
+ }
}
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
index f0d3fa83b..da154ec1e 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
@@ -75,6 +75,7 @@ public final class SetUpFacade {
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
+ serverService.removeOfflineServers();
}
/**