This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b36b75  Make sure supporting the case that job instances are crashed 
while executing failover. (#1873)
4b36b75 is described below

commit 4b36b751cc55d55aa020f317c42cac4680eb1386
Author: sucg <[email protected]>
AuthorDate: Mon Apr 26 10:58:06 2021 +0800

    Make sure supporting the case that job instances are crashed while 
executing failover. (#1873)
    
    Co-authored-by: suchunguan <[email protected]>
---
 .../infra/handler/sharding/JobInstance.java        |  2 +-
 .../internal/failover/FailoverListenerManager.java |  6 ++--
 .../lite/internal/failover/FailoverNode.java       |  8 ++++++
 .../lite/internal/failover/FailoverService.java    | 33 +++++++++++++++++++++-
 .../lite/internal/sharding/ShardingService.java    | 21 ++++++++++++++
 .../failover/FailoverListenerManagerTest.java      |  6 ++--
 .../lite/internal/failover/FailoverNodeTest.java   |  5 ++++
 .../internal/failover/FailoverServiceTest.java     | 25 ++++++++++++++++
 .../internal/sharding/ShardingServiceTest.java     | 20 ++++++++++++-
 9 files changed, 117 insertions(+), 9 deletions(-)

diff --git 
a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
 
b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
index 2b0ffc7..48ef8bb 100644
--- 
a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
+++ 
b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
@@ -32,7 +32,7 @@ import java.lang.management.ManagementFactory;
 @EqualsAndHashCode(of = "jobInstanceId")
 public final class JobInstance {
     
-    private static final String DELIMITER = "@-@";
+    public static final String DELIMITER = "@-@";
     
     private String jobInstanceId;
     
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
index 2eedab3..6f46161 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
@@ -76,14 +76,14 @@ public final class FailoverListenerManager extends 
AbstractListenerManager {
                 if 
(jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()))
 {
                     return;
                 }
-                List<Integer> failoverItems = 
failoverService.getFailoverItems(jobInstanceId);
+                List<Integer> failoverItems = 
failoverService.getFailoveringItems(jobInstanceId);
                 if (!failoverItems.isEmpty()) {
                     for (int each : failoverItems) {
-                        failoverService.setCrashedFailoverFlag(each);
+                        failoverService.setCrashedFailoverFlagDirectly(each);
                         failoverService.failoverIfNecessary();
                     }
                 } else {
-                    for (int each : 
shardingService.getShardingItems(jobInstanceId)) {
+                    for (int each : 
shardingService.getCrashedShardingItems(jobInstanceId)) {
                         failoverService.setCrashedFailoverFlag(each);
                         failoverService.failoverIfNecessary();
                     }
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
index 734faf7..113d16b 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
@@ -37,6 +37,10 @@ public final class FailoverNode {
     static final String LATCH = LEADER_ROOT + "/latch";
     
     private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + 
"/%s/" + FAILOVER;
+
+    private static final String FAILOVERING = "failovering";
+
+    private static final String EXECUTING_FAILOVER = ShardingNode.ROOT + 
"/%s/" + FAILOVERING;
     
     private final JobNodePath jobNodePath;
     
@@ -51,6 +55,10 @@ public final class FailoverNode {
     static String getExecutionFailoverNode(final int item) {
         return String.format(EXECUTION_FAILOVER, item);
     }
+
+    static String getExecutingFailoverNode(final int item) {
+        return String.format(EXECUTING_FAILOVER, item);
+    }
     
     /**
      * Get sharding item by execution failover path.
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
index 5d8a93e..56594af 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
@@ -59,7 +59,16 @@ public final class FailoverService {
             
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
         }
     }
-    
+
+    /**
+     * set crashed failover flag directly.
+     *
+     * @param item crashed item
+     */
+    public void setCrashedFailoverFlagDirectly(final int item) {
+        jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
+    }
+
     private boolean isFailoverAssigned(final Integer item) {
         return 
jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
     }
@@ -86,6 +95,7 @@ public final class FailoverService {
     public void updateFailoverComplete(final Collection<Integer> items) {
         for (int each : items) {
             
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutionFailoverNode(each));
+            
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutingFailoverNode(each));
         }
     }
     
@@ -108,6 +118,26 @@ public final class FailoverService {
         Collections.sort(result);
         return result;
     }
+
+    /**
+     * Get failovering items.
+     *
+     * @param jobInstanceId job instance ID
+     * @return failovering items
+     */
+    public List<Integer> getFailoveringItems(final String jobInstanceId) {
+        List<String> items = 
jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
+        List<Integer> result = new ArrayList<>(items.size());
+        for (String each : items) {
+            int item = Integer.parseInt(each);
+            String node = FailoverNode.getExecutingFailoverNode(item);
+            if (jobNodeStorage.isJobNodeExisted(node) && 
jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
+                result.add(item);
+            }
+        }
+        Collections.sort(result);
+        return result;
+    }
     
     /**
      * Get failover items which execute on localhost.
@@ -156,6 +186,7 @@ public final class FailoverService {
             int crashedItem = 
Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
             log.debug("Failover job '{}' begin, crashed item '{}'", jobName, 
crashedItem);
             
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem),
 JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+            
jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), 
JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
             
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
             // TODO Instead of using triggerJob, use executor for unified 
scheduling
             JobScheduleController jobScheduleController = 
JobRegistry.getInstance().getJobScheduleController(jobName);
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index e9e8bb1..9ac967e 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -174,6 +174,27 @@ public final class ShardingService {
         }
         return result;
     }
+
+    /**
+     * Get crashed sharding items.
+     *
+     * @param jobInstanceId crashed job instance ID
+     * @return crashed sharding items
+     */
+    public List<Integer> getCrashedShardingItems(final String jobInstanceId) {
+        String serverIp = jobInstanceId.substring(0, 
jobInstanceId.indexOf(JobInstance.DELIMITER));
+        if (!serverService.isEnableServer(serverIp)) {
+            return Collections.emptyList();
+        }
+        List<Integer> result = new LinkedList<>();
+        int shardingTotalCount = 
configService.load(true).getShardingTotalCount();
+        for (int i = 0; i < shardingTotalCount; i++) {
+            if 
(jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i))))
 {
+                result.add(i);
+            }
+        }
+        return result;
+    }
     
     /**
      * Get sharding items from localhost job server.
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
index 4d4e93b..8cce3f9 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
@@ -117,7 +117,7 @@ public final class FailoverListenerManagerTest {
         JobRegistry.getInstance().addJobInstance("test_job", new 
JobInstance("127.0.0.1@-@0"));
         JobRegistry.getInstance().registerJob("test_job", 
jobScheduleController);
         
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
 3).cron("0/1 * * * * ?").failover(true).build());
-        
when(shardingService.getShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0,
 2));
+        
when(shardingService.getCrashedShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0,
 2));
         failoverListenerManager.new 
JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", 
Type.NODE_DELETED, "");
         verify(failoverService).setCrashedFailoverFlag(0);
         verify(failoverService).setCrashedFailoverFlag(2);
@@ -130,9 +130,9 @@ public final class FailoverListenerManagerTest {
         JobRegistry.getInstance().addJobInstance("test_job", new 
JobInstance("127.0.0.1@-@0"));
         JobRegistry.getInstance().registerJob("test_job", 
jobScheduleController);
         
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
 3).cron("0/1 * * * * ?").failover(true).build());
-        
when(failoverService.getFailoverItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
+        
when(failoverService.getFailoveringItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
         failoverListenerManager.new 
JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", 
Type.NODE_DELETED, "");
-        verify(failoverService).setCrashedFailoverFlag(1);
+        verify(failoverService).setCrashedFailoverFlagDirectly(1);
         verify(failoverService).failoverIfNecessary();
         JobRegistry.getInstance().shutdown("test_job");
     }
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
index c215c7b..0ef550f 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
@@ -46,4 +46,9 @@ public final class FailoverNodeTest {
     public void assertGetItemByExecutionFailoverPath() {
         
assertThat(failoverNode.getItemByExecutionFailoverPath("/test_job/sharding/0/failover"),
 is(0));
     }
+
+    @Test
+    public void assertGetProcessingFailoverNode() {
+        assertThat(FailoverNode.getExecutingFailoverNode(0), 
is("sharding/0/failovering"));
+    }
 }
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
index a5ab300..ce6d512 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
@@ -81,6 +81,12 @@ public final class FailoverServiceTest {
         verify(jobNodeStorage).isJobNodeExisted("sharding/0/failover");
         
verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
     }
+
+    @Test
+    public void assertSetCrashedFailoverFlagDirectly() {
+        failoverService.setCrashedFailoverFlag(0);
+        
verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
+    }
     
     @Test
     public void assertFailoverIfUnnecessaryWhenItemsRootNodeNotExisted() {
@@ -146,11 +152,30 @@ public final class FailoverServiceTest {
         verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
         verify(jobNodeStorage, 
times(2)).getJobNodeChildrenKeys("leader/failover/items");
         verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/failover", 
"127.0.0.1@-@0");
+        verify(jobNodeStorage).fillJobNode("sharding/0/failovering", 
"127.0.0.1@-@0");
         
verify(jobNodeStorage).removeJobNodeIfExisted("leader/failover/items/0");
         verify(jobScheduleController).triggerJob();
         JobRegistry.getInstance().setJobRunning("test_job", false);
         JobRegistry.getInstance().shutdown("test_job");
     }
+
+    @Test
+    public void assertGetFailoveringItems() {
+        JobRegistry.getInstance().registerJob("test_job", 
jobScheduleController);
+        
when(jobNodeStorage.getJobNodeChildrenKeys("sharding")).thenReturn(Arrays.asList("0",
 "1", "2"));
+        
when(jobNodeStorage.isJobNodeExisted("sharding/0/failovering")).thenReturn(true);
+        
when(jobNodeStorage.isJobNodeExisted("sharding/1/failovering")).thenReturn(true);
+        
when(jobNodeStorage.isJobNodeExisted("sharding/2/failovering")).thenReturn(false);
+        
when(jobNodeStorage.getJobNodeDataDirectly("sharding/0/failovering")).thenReturn("127.0.0.1@-@0");
+        
when(jobNodeStorage.getJobNodeDataDirectly("sharding/1/failovering")).thenReturn("127.0.0.1@-@1");
+        assertThat(failoverService.getFailoveringItems("127.0.0.1@-@1"), 
is(Collections.singletonList(1)));
+        verify(jobNodeStorage).getJobNodeChildrenKeys("sharding");
+        verify(jobNodeStorage).isJobNodeExisted("sharding/0/failovering");
+        verify(jobNodeStorage).isJobNodeExisted("sharding/1/failovering");
+        
verify(jobNodeStorage).getJobNodeDataDirectly("sharding/0/failovering");
+        
verify(jobNodeStorage).getJobNodeDataDirectly("sharding/1/failovering");
+        JobRegistry.getInstance().shutdown("test_job");
+    }
     
     @Test
     public void assertUpdateFailoverComplete() {
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 5f5eee6..12a433b 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -186,7 +186,7 @@ public final class ShardingServiceTest {
     }
     
     @Test
-    public void assertGetShardingItemsWithEnabledServer() {
+    public void assertGetShardingItemsWithAvailableServer() {
         JobRegistry.getInstance().registerRegistryCenter("test_job", 
regCenter);
         JobRegistry.getInstance().registerJob("test_job", 
jobScheduleController);
         when(serverService.isAvailableServer("127.0.0.1")).thenReturn(true);
@@ -274,4 +274,22 @@ public final class ShardingServiceTest {
         verify(transactionOp, times(3)).create();
         verify(transactionOp, times(2)).delete();
     }
+
+    @Test
+    public void assertGetCrashedShardingItemsWithNotEnableServer() {
+        assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), 
is(Collections.<Integer>emptyList()));
+    }
+
+    @Test
+    public void assertGetCrashedShardingItemsWithEnabledServer() {
+        JobRegistry.getInstance().registerRegistryCenter("test_job", 
regCenter);
+        JobRegistry.getInstance().registerJob("test_job", 
jobScheduleController);
+        when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
+        
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
 3).cron("0/1 * * * * ?").build());
+        
when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
+        
when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
+        
when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+        assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), 
is(Arrays.asList(0, 2)));
+        JobRegistry.getInstance().shutdown("test_job");
+    }
 }

Reply via email to