This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 8b54bba84 [ISSUE #2142]add distributed lock to avoid onceListener
invoke multi times (#2168)
8b54bba84 is described below
commit 8b54bba845c6c6e4a4e30a19c0cd222f704a2c3b
Author: songxiulu <[email protected]>
AuthorDate: Tue Jan 17 18:49:12 2023 +0800
[ISSUE #2142]add distributed lock to avoid onceListener invoke multi times
(#2168)
* add distributed lock to avoid onceListener invoke multi times
* use execute in leader method
* change Tests
* remove unused log
* add sleep time to avoid test server stop by other thread.
* update test
Co-authored-by: songxiulu <[email protected]>
---
.../AbstractDistributeOnceElasticJobListener.java | 6 +-
.../lite/internal/guarantee/GuaranteeNode.java | 4 +
.../lite/internal/guarantee/GuaranteeService.java | 94 +++++++++++++++++++---
.../DistributeOnceElasticJobListenerTest.java | 6 +-
.../internal/guarantee/GuaranteeServiceTest.java | 41 ++++++++++
.../boot/job/fixture/EmbedTestingServer.java | 6 +-
6 files changed, 137 insertions(+), 20 deletions(-)
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
index 4e72e3be6..e5f287fff 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
@@ -61,8 +61,7 @@ public abstract class
AbstractDistributeOnceElasticJobListener implements Elasti
BlockUtils.waitingShortTime();
}
if (guaranteeService.isAllStarted()) {
- doBeforeJobExecutedAtLastStarted(shardingContexts);
- guaranteeService.clearAllStartedInfo();
+ guaranteeService.executeInLeaderForLastStarted(this,
shardingContexts);
return;
}
long before = timeService.getCurrentMillis();
@@ -90,8 +89,7 @@ public abstract class
AbstractDistributeOnceElasticJobListener implements Elasti
BlockUtils.waitingShortTime();
}
if (guaranteeService.isAllCompleted()) {
- doAfterJobExecutedAtLastCompleted(shardingContexts);
- guaranteeService.clearAllCompletedInfo();
+ guaranteeService.executeInLeaderForLastCompleted(this,
shardingContexts);
return;
}
long before = timeService.getCurrentMillis();
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
index f4edb8988..e9efa9cd9 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeNode.java
@@ -29,6 +29,10 @@ public final class GuaranteeNode {
static final String STARTED_ROOT = ROOT + "/started";
static final String COMPLETED_ROOT = ROOT + "/completed";
+
+ static final String STARTED_LATCH_ROOT = ROOT + "/started-latch";
+
+ static final String COMPLETED_LATCH_ROOT = ROOT + "/completed-latch";
private final JobNodePath jobNodePath;
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
index fe6b5a903..0a9d94e52 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
@@ -17,9 +17,13 @@
package org.apache.shardingsphere.elasticjob.lite.internal.guarantee;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+import
org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import
org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
import java.util.Collection;
@@ -27,16 +31,16 @@ import java.util.Collection;
* Guarantee service.
*/
public final class GuaranteeService {
-
+
private final JobNodeStorage jobNodeStorage;
-
+
private final ConfigurationService configService;
-
+
public GuaranteeService(final CoordinatorRegistryCenter regCenter, final
String jobName) {
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
configService = new ConfigurationService(regCenter, jobName);
}
-
+
/**
* Register start.
*
@@ -47,7 +51,7 @@ public final class GuaranteeService {
jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
}
}
-
+
/**
* Judge whether current sharding items are all register start success.
*
@@ -62,7 +66,7 @@ public final class GuaranteeService {
}
return true;
}
-
+
/**
* Judge whether job's sharding items are all started.
*
@@ -72,14 +76,14 @@ public final class GuaranteeService {
return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT)
&& configService.load(false).getShardingTotalCount() ==
jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size();
}
-
+
/**
* Clear all started job's info.
*/
public void clearAllStartedInfo() {
jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
}
-
+
/**
* Register complete.
*
@@ -90,7 +94,7 @@ public final class GuaranteeService {
jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getCompletedNode(each));
}
}
-
+
/**
* Judge whether sharding items are register complete success.
*
@@ -105,7 +109,7 @@ public final class GuaranteeService {
}
return true;
}
-
+
/**
* Judge whether job's sharding items are all completed.
*
@@ -115,11 +119,79 @@ public final class GuaranteeService {
return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT)
&& configService.load(false).getShardingTotalCount() <=
jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size();
}
-
+
/**
* Clear all completed job's info.
*/
public void clearAllCompletedInfo() {
jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.COMPLETED_ROOT);
}
+
+ /**
+ * Invoke doBeforeJobExecutedAtLastStarted method once after last started.
+ *
+ * @param listener AbstractDistributeOnceElasticJobListener
instance
+ * @param shardingContexts sharding contexts
+ */
+ public void executeInLeaderForLastStarted(final
AbstractDistributeOnceElasticJobListener listener,
+ final ShardingContexts
shardingContexts) {
+ jobNodeStorage.executeInLeader(GuaranteeNode.STARTED_LATCH_ROOT,
+ new LeaderExecutionCallbackForLastStarted(listener,
shardingContexts));
+ }
+
+ /**
+ * Invoke doAfterJobExecutedAtLastCompleted method once after last
completed.
+ *
+ * @param listener AbstractDistributeOnceElasticJobListener
instance
+ * @param shardingContexts sharding contexts
+ */
+ public void executeInLeaderForLastCompleted(final
AbstractDistributeOnceElasticJobListener listener,
+ final ShardingContexts
shardingContexts) {
+ jobNodeStorage.executeInLeader(GuaranteeNode.COMPLETED_LATCH_ROOT,
+ new LeaderExecutionCallbackForLastCompleted(listener,
shardingContexts));
+ }
+
+ /**
+ * Inner class for last started callback.
+ */
+ @RequiredArgsConstructor
+ class LeaderExecutionCallbackForLastStarted implements
LeaderExecutionCallback {
+ private final AbstractDistributeOnceElasticJobListener listener;
+
+ private final ShardingContexts shardingContexts;
+
+ @Override
+ public void execute() {
+ try {
+ if (!isAllStarted()) {
+ return;
+ }
+ listener.doBeforeJobExecutedAtLastStarted(shardingContexts);
+ } finally {
+ clearAllStartedInfo();
+ }
+ }
+ }
+
+ /**
+ * Inner class for last completed callback.
+ */
+ @RequiredArgsConstructor
+ class LeaderExecutionCallbackForLastCompleted implements
LeaderExecutionCallback {
+ private final AbstractDistributeOnceElasticJobListener listener;
+
+ private final ShardingContexts shardingContexts;
+
+ @Override
+ public void execute() {
+ try {
+ if (!isAllCompleted()) {
+ return;
+ }
+ listener.doAfterJobExecutedAtLastCompleted(shardingContexts);
+ } finally {
+ clearAllCompletedInfo();
+ }
+ }
+ }
}
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
index 549daef62..a8f778738 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
@@ -72,8 +72,7 @@ public final class DistributeOnceElasticJobListenerTest {
when(guaranteeService.isAllStarted()).thenReturn(true);
distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
verify(guaranteeService).registerStart(Sets.newHashSet(0, 1));
- verify(elasticJobListenerCaller).before();
- verify(guaranteeService).clearAllStartedInfo();
+
verify(guaranteeService).executeInLeaderForLastStarted(distributeOnceElasticJobListener,
shardingContexts);
}
@Test
@@ -102,8 +101,7 @@ public final class DistributeOnceElasticJobListenerTest {
when(guaranteeService.isAllCompleted()).thenReturn(true);
distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
verify(guaranteeService).registerComplete(Sets.newHashSet(0, 1));
- verify(elasticJobListenerCaller).after();
- verify(guaranteeService).clearAllCompletedInfo();
+
verify(guaranteeService).executeInLeaderForLastCompleted(distributeOnceElasticJobListener,
shardingContexts);
}
@Test
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
index dba900fd7..1e9c223cd 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.elasticjob.lite.internal.guarantee;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+import
org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import
org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -31,6 +33,7 @@ import java.util.Arrays;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -42,6 +45,12 @@ public final class GuaranteeServiceTest {
@Mock
private ConfigurationService configService;
+
+ @Mock
+ private AbstractDistributeOnceElasticJobListener listener;
+
+ @Mock
+ private ShardingContexts shardingContexts;
private final GuaranteeService guaranteeService = new
GuaranteeService(null, "test_job");
@@ -143,4 +152,36 @@ public final class GuaranteeServiceTest {
guaranteeService.clearAllCompletedInfo();
verify(jobNodeStorage).removeJobNodeIfExisted("guarantee/completed");
}
+
+ @Test
+ public void assertExecuteInLeaderForLastCompleted() {
+
when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(true);
+
when(configService.load(false)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").build());
+
when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/completed")).thenReturn(Arrays.asList("0",
"1", "2"));
+ guaranteeService.new LeaderExecutionCallbackForLastCompleted(listener,
shardingContexts).execute();
+ verify(listener).doAfterJobExecutedAtLastCompleted(shardingContexts);
+ }
+
+ @Test
+ public void assertExecuteInLeaderForNotLastCompleted() {
+
when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(false);
+ guaranteeService.new LeaderExecutionCallbackForLastCompleted(listener,
shardingContexts).execute();
+ verify(listener,
never()).doAfterJobExecutedAtLastCompleted(shardingContexts);
+ }
+
+ @Test
+ public void assertExecuteInLeaderForLastStarted() {
+
when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(true);
+
when(configService.load(false)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").build());
+
when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/started")).thenReturn(Arrays.asList("0",
"1", "2"));
+ guaranteeService.new LeaderExecutionCallbackForLastStarted(listener,
shardingContexts).execute();
+ verify(listener).doBeforeJobExecutedAtLastStarted(shardingContexts);
+ }
+
+ @Test
+ public void assertExecuteInLeaderForNotLastStarted() {
+
when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(false);
+ guaranteeService.new LeaderExecutionCallbackForLastStarted(listener,
shardingContexts).execute();
+ verify(listener,
never()).doBeforeJobExecutedAtLastStarted(shardingContexts);
+ }
}
diff --git
a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
index 799202d30..aa97974ea 100644
---
a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
+++
b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/fixture/EmbedTestingServer.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.elasticjob.lite.spring.boot.job.fixture;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.curator.test.TestingServer;
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
import java.io.File;
@@ -45,6 +46,9 @@ public final class EmbedTestingServer {
* Start the server.
*/
public static void start() {
+ // sleep some time to avoid testServer intended stop.
+ long sleepTime = 1000L;
+ BlockUtils.sleep(sleepTime);
if (null != testingServer) {
return;
}
@@ -57,7 +61,7 @@ public final class EmbedTestingServer {
} finally {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
- Thread.sleep(1000);
+ Thread.sleep(sleepTime);
testingServer.close();
} catch (final IOException | InterruptedException ex) {
RegExceptionHandler.handleException(ex);