This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b54bba84 [ISSUE #2142]add distributed lock to avoid onceListener 
invoke multi times (#2168)
8b54bba84 is described below

commit 8b54bba845c6c6e4a4e30a19c0cd222f704a2c3b
Author: songxiulu <[email protected]>
AuthorDate: Tue Jan 17 18:49:12 2023 +0800

    [ISSUE #2142]add distributed lock to avoid onceListener invoke multi times 
(#2168)
    
    * add distributed lock to avoid onceListener invoke multi times
    
    * use execute in leader method
    
    * change Tests
    
    * remove unused log
    
    * add sleep time to avoid test server stop by other thread.
    
    * update test
    
    Co-authored-by: songxiulu <[email protected]>
---
 .../AbstractDistributeOnceElasticJobListener.java  |  6 +-
 .../lite/internal/guarantee/GuaranteeNode.java     |  4 +
 .../lite/internal/guarantee/GuaranteeService.java  | 94 +++++++++++++++++++---
 .../DistributeOnceElasticJobListenerTest.java      |  6 +-
 .../internal/guarantee/GuaranteeServiceTest.java   | 41 ++++++++++
 .../boot/job/fixture/EmbedTestingServer.java       |  6 +-
 6 files changed, 137 insertions(+), 20 deletions(-)

diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
index 4e72e3be6..e5f287fff 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
@@ -61,8 +61,7 @@ public abstract class 
AbstractDistributeOnceElasticJobListener implements Elasti
             BlockUtils.waitingShortTime();
         }
         if (guaranteeService.isAllStarted()) {
-            doBeforeJobExecutedAtLastStarted(shardingContexts);
-            guaranteeService.clearAllStartedInfo();
+            guaranteeService.executeInLeaderForLastStarted(this, 
shardingContexts);
             return;
         }
         long before = timeService.getCurrentMillis();
@@ -90,8 +89,7 @@ public abstract class 
AbstractDistributeOnceElasticJobListener implements Elasti
             BlockUtils.waitingShortTime();
         }
         if (guaranteeService.isAllCompleted()) {
-            doAfterJobExecutedAtLastCompleted(shardingContexts);
-            guaranteeService.clearAllCompletedInfo();
+            guaranteeService.executeInLeaderForLastCompleted(this, 
shardingContexts);
             return;
         }
         long before = timeService.getCurrentMillis();
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
index f4edb8988..e9efa9cd9 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
@@ -29,6 +29,10 @@ public final class GuaranteeNode {
     static final String STARTED_ROOT = ROOT + "/started";
     
     static final String COMPLETED_ROOT = ROOT + "/completed";
+
+    static final String STARTED_LATCH_ROOT = ROOT + "/started-latch";
+
+    static final String COMPLETED_LATCH_ROOT = ROOT + "/completed-latch";
     
     private final JobNodePath jobNodePath;
     
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
index fe6b5a903..0a9d94e52 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
@@ -17,9 +17,13 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.guarantee;
 
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+import 
org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
 
 import java.util.Collection;
 
@@ -27,16 +31,16 @@ import java.util.Collection;
  * Guarantee service.
  */
 public final class GuaranteeService {
-    
+
     private final JobNodeStorage jobNodeStorage;
-    
+
     private final ConfigurationService configService;
-    
+
     public GuaranteeService(final CoordinatorRegistryCenter regCenter, final 
String jobName) {
         jobNodeStorage = new JobNodeStorage(regCenter, jobName);
         configService = new ConfigurationService(regCenter, jobName);
     }
-    
+
     /**
      * Register start.
      *
@@ -47,7 +51,7 @@ public final class GuaranteeService {
             
jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
         }
     }
-    
+
     /**
      * Judge whether current sharding items are all register start success.
      *
@@ -62,7 +66,7 @@ public final class GuaranteeService {
         }
         return true;
     }
-    
+
     /**
      * Judge whether job's sharding items are all started.
      *
@@ -72,14 +76,14 @@ public final class GuaranteeService {
         return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT)
                 && configService.load(false).getShardingTotalCount() == 
jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size();
     }
-    
+
     /**
      * Clear all started job's info.
      */
     public void clearAllStartedInfo() {
         jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
     }
-    
+
     /**
      * Register complete.
      *
@@ -90,7 +94,7 @@ public final class GuaranteeService {
             
jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getCompletedNode(each));
         }
     }
-    
+
     /**
      * Judge whether sharding items are register complete success.
      *
@@ -105,7 +109,7 @@ public final class GuaranteeService {
         }
         return true;
     }
-    
+
     /**
      * Judge whether job's sharding items are all completed.
      *
@@ -115,11 +119,79 @@ public final class GuaranteeService {
         return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT)
                 && configService.load(false).getShardingTotalCount() <= 
jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size();
     }
-    
+
     /**
      * Clear all completed job's info.
      */
     public void clearAllCompletedInfo() {
         jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.COMPLETED_ROOT);
     }
+
+    /**
+     * Invoke doBeforeJobExecutedAtLastStarted method once after last started.
+     *
+     * @param listener         AbstractDistributeOnceElasticJobListener 
instance
+     * @param shardingContexts sharding contexts
+     */
+    public void executeInLeaderForLastStarted(final 
AbstractDistributeOnceElasticJobListener listener,
+                                              final ShardingContexts 
shardingContexts) {
+        jobNodeStorage.executeInLeader(GuaranteeNode.STARTED_LATCH_ROOT,
+                new LeaderExecutionCallbackForLastStarted(listener, 
shardingContexts));
+    }
+
+    /**
+     * Invoke doAfterJobExecutedAtLastCompleted method once after last 
completed.
+     *
+     * @param listener         AbstractDistributeOnceElasticJobListener 
instance
+     * @param shardingContexts sharding contexts
+     */
+    public void executeInLeaderForLastCompleted(final 
AbstractDistributeOnceElasticJobListener listener,
+                                                final ShardingContexts 
shardingContexts) {
+        jobNodeStorage.executeInLeader(GuaranteeNode.COMPLETED_LATCH_ROOT,
+                new LeaderExecutionCallbackForLastCompleted(listener, 
shardingContexts));
+    }
+
+    /**
+     * Inner class for last started callback.
+     */
+    @RequiredArgsConstructor
+    class LeaderExecutionCallbackForLastStarted implements 
LeaderExecutionCallback {
+        private final AbstractDistributeOnceElasticJobListener listener;
+
+        private final ShardingContexts shardingContexts;
+
+        @Override
+        public void execute() {
+            try {
+                if (!isAllStarted()) {
+                    return;
+                }
+                listener.doBeforeJobExecutedAtLastStarted(shardingContexts);
+            } finally {
+                clearAllStartedInfo();
+            }
+        }
+    }
+
+    /**
+     * Inner class for last completed callback.
+     */
+    @RequiredArgsConstructor
+    class LeaderExecutionCallbackForLastCompleted implements 
LeaderExecutionCallback {
+        private final AbstractDistributeOnceElasticJobListener listener;
+
+        private final ShardingContexts shardingContexts;
+
+        @Override
+        public void execute() {
+            try {
+                if (!isAllCompleted()) {
+                    return;
+                }
+                listener.doAfterJobExecutedAtLastCompleted(shardingContexts);
+            } finally {
+                clearAllCompletedInfo();
+            }
+        }
+    }
 }
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
index 549daef62..a8f778738 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
@@ -72,8 +72,7 @@ public final class DistributeOnceElasticJobListenerTest {
         when(guaranteeService.isAllStarted()).thenReturn(true);
         distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
         verify(guaranteeService).registerStart(Sets.newHashSet(0, 1));
-        verify(elasticJobListenerCaller).before();
-        verify(guaranteeService).clearAllStartedInfo();
+        
verify(guaranteeService).executeInLeaderForLastStarted(distributeOnceElasticJobListener,
 shardingContexts);
     }
     
     @Test
@@ -102,8 +101,7 @@ public final class DistributeOnceElasticJobListenerTest {
         when(guaranteeService.isAllCompleted()).thenReturn(true);
         distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
         verify(guaranteeService).registerComplete(Sets.newHashSet(0, 1));
-        verify(elasticJobListenerCaller).after();
-        verify(guaranteeService).clearAllCompletedInfo();
+        
verify(guaranteeService).executeInLeaderForLastCompleted(distributeOnceElasticJobListener,
 shardingContexts);
     }
     
     @Test
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
index dba900fd7..1e9c223cd 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.elasticjob.lite.internal.guarantee;
 
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+import 
org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -31,6 +33,7 @@ import java.util.Arrays;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -42,6 +45,12 @@ public final class GuaranteeServiceTest {
     
     @Mock
     private ConfigurationService configService;
+
+    @Mock
+    private AbstractDistributeOnceElasticJobListener listener;
+
+    @Mock
+    private ShardingContexts shardingContexts;
     
     private final GuaranteeService guaranteeService = new 
GuaranteeService(null, "test_job");
     
@@ -143,4 +152,36 @@ public final class GuaranteeServiceTest {
         guaranteeService.clearAllCompletedInfo();
         verify(jobNodeStorage).removeJobNodeIfExisted("guarantee/completed");
     }
+
+    @Test
+    public void assertExecuteInLeaderForLastCompleted() {
+        
when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(true);
+        
when(configService.load(false)).thenReturn(JobConfiguration.newBuilder("test_job",
 3).cron("0/1 * * * * ?").build());
+        
when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/completed")).thenReturn(Arrays.asList("0",
 "1", "2"));
+        guaranteeService.new LeaderExecutionCallbackForLastCompleted(listener, 
shardingContexts).execute();
+        verify(listener).doAfterJobExecutedAtLastCompleted(shardingContexts);
+    }
+
+    @Test
+    public void assertExecuteInLeaderForNotLastCompleted() {
+        
when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(false);
+        guaranteeService.new LeaderExecutionCallbackForLastCompleted(listener, 
shardingContexts).execute();
+        verify(listener, 
never()).doAfterJobExecutedAtLastCompleted(shardingContexts);
+    }
+
+    @Test
+    public void assertExecuteInLeaderForLastStarted() {
+        
when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(true);
+        
when(configService.load(false)).thenReturn(JobConfiguration.newBuilder("test_job",
 3).cron("0/1 * * * * ?").build());
+        
when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/started")).thenReturn(Arrays.asList("0",
 "1", "2"));
+        guaranteeService.new LeaderExecutionCallbackForLastStarted(listener, 
shardingContexts).execute();
+        verify(listener).doBeforeJobExecutedAtLastStarted(shardingContexts);
+    }
+
+    @Test
+    public void assertExecuteInLeaderForNotLastStarted() {
+        
when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(false);
+        guaranteeService.new LeaderExecutionCallbackForLastStarted(listener, 
shardingContexts).execute();
+        verify(listener, 
never()).doBeforeJobExecutedAtLastStarted(shardingContexts);
+    }
 }
diff --git 
a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
 
b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
index 799202d30..aa97974ea 100644
--- 
a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
+++ 
b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.elasticjob.lite.spring.boot.job.fixture;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.curator.test.TestingServer;
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
 import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
 
 import java.io.File;
@@ -45,6 +46,9 @@ public final class EmbedTestingServer {
      * Start the server.
      */
     public static void start() {
+        // sleep some time to avoid testServer intended stop.
+        long sleepTime = 1000L;
+        BlockUtils.sleep(sleepTime);
         if (null != testingServer) {
             return;
         }
@@ -57,7 +61,7 @@ public final class EmbedTestingServer {
         } finally {
             Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                 try {
-                    Thread.sleep(1000);
+                    Thread.sleep(sleepTime);
                     testingServer.close();
                 } catch (final IOException | InterruptedException ex) {
                     RegExceptionHandler.handleException(ex);

Reply via email to