This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2dbeb61eccb Refactor PipelineDistributedBarrier as singleton (#23908)
2dbeb61eccb is described below
commit 2dbeb61eccb0c795a469f44633032e6412ef4620
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Feb 2 10:16:10 2023 +0800
Refactor PipelineDistributedBarrier as singleton (#23908)
---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 4 ++--
.../pipeline/core/job/AbstractPipelineJob.java | 2 +-
.../impl/BarrierMetaDataChangedEventHandler.java | 2 +-
.../core/util/PipelineDistributedBarrier.java | 14 +++++++++++
...tencyCheckChangedJobConfigurationProcessor.java | 3 +--
.../MigrationChangedJobConfigurationProcessor.java | 3 +--
.../core/api/impl/MigrationJobAPITest.java | 28 +++++++++++++++++-----
.../core/util/PipelineDistributedBarrierTest.java | 4 ++--
8 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3dc039a6a0f..f774125c855 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -117,7 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void startDisabledJob(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
+ PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineJobHasAlreadyStartedException(jobId));
@@ -133,7 +133,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void stop(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
+ PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
jobConfigPOJO.setDisabled(true);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 958fba3c4ae..3b146248b76 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -100,7 +100,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
return false;
}
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(),
shardingItem);
- new
PipelineDistributedBarrier().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
shardingItem);
+
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
shardingItem);
return true;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index 36591f27f68..8752b3b3170 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -38,7 +38,7 @@ public final class BarrierMetaDataChangedEventHandler
implements PipelineMetaDat
@Override
public void handle(final DataChangedEvent event) {
if (event.getType() == Type.ADDED) {
- new
PipelineDistributedBarrier().notifyChildrenNodeCountCheck(event.getKey());
+
PipelineDistributedBarrier.getInstance().notifyChildrenNodeCountCheck(event.getKey());
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 2ecfd906199..6d3e050d903 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -18,7 +18,9 @@
package org.apache.shardingsphere.data.pipeline.core.util;
import com.google.common.base.Strings;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -36,8 +38,11 @@ import java.util.concurrent.TimeUnit;
* Pipeline distributed barrier.
*/
@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineDistributedBarrier {
+ private static final PipelineDistributedBarrier INSTANCE = new
PipelineDistributedBarrier();
+
private static final LazyInitializer<ClusterPersistRepository>
REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
@Override
@@ -48,6 +53,15 @@ public final class PipelineDistributedBarrier {
private final Map<String, InnerCountDownLatchHolder> countDownLatchHolders
= new ConcurrentHashMap<>();
+ /**
+ * Get instance.
+ *
+ * @return instance
+ */
+ public static PipelineDistributedBarrier getInstance() {
+ return INSTANCE;
+ }
+
@SneakyThrows(ConcurrentException.class)
private static ClusterPersistRepository getRepository() {
return REPOSITORY_LAZY_INITIALIZER.get();
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 7faf007c6ee..016520a2cbe 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -45,9 +45,8 @@ public final class
ConsistencyCheckChangedJobConfigurationProcessor implements P
if (jobConfig.isDisabled()) {
Collection<Integer> shardingItems =
PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
- PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
for (Integer each : shardingItems) {
-
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
+
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
}
return;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index d9181fcdb18..5150befc687 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -47,9 +47,8 @@ public final class MigrationChangedJobConfigurationProcessor
implements Pipeline
if (jobConfig.isDisabled()) {
Collection<Integer> shardingItems =
PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
- PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
for (Integer each : shardingItems) {
-
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
+
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
}
return;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
index d00b4d66417..457f8496d07 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -48,6 +49,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
@@ -69,6 +71,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
@RunWith(MockitoJUnitRunner.class)
public final class MigrationJobAPITest {
@@ -111,10 +115,14 @@ public final class MigrationJobAPITest {
Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
- jobAPI.stop(jobId.get());
- assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
- jobAPI.startDisabledJob(jobId.get());
- assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
+ PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
+ try (MockedStatic<PipelineDistributedBarrier> distributedBarrierMock =
mockStatic(PipelineDistributedBarrier.class)) {
+
distributedBarrierMock.when(PipelineDistributedBarrier::getInstance).thenReturn(mockBarrier);
+ jobAPI.stop(jobId.get());
+ assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
+ jobAPI.startDisabledJob(jobId.get());
+ assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
+ }
}
@Test
@@ -123,7 +131,11 @@ public final class MigrationJobAPITest {
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
- jobAPI.rollback(jobId.get());
+ PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
+ try (MockedStatic<PipelineDistributedBarrier> distributedBarrierMock =
mockStatic(PipelineDistributedBarrier.class)) {
+
distributedBarrierMock.when(PipelineDistributedBarrier::getInstance).thenReturn(mockBarrier);
+ jobAPI.rollback(jobId.get());
+ }
assertNull(getJobConfigurationPOJO(jobId.get()));
}
@@ -133,7 +145,11 @@ public final class MigrationJobAPITest {
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
- jobAPI.commit(jobId.get());
+ PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
+ try (MockedStatic<PipelineDistributedBarrier> distributedBarrierMock =
mockStatic(PipelineDistributedBarrier.class)) {
+
distributedBarrierMock.when(PipelineDistributedBarrier::getInstance).thenReturn(mockBarrier);
+ jobAPI.commit(jobId.get());
+ }
assertNull(getJobConfigurationPOJO(jobId.get()));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 8cf105b9e43..858d260058c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -44,7 +44,7 @@ public final class PipelineDistributedBarrierTest {
String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
PersistRepository repository =
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
+ PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance();
String parentPath = "/barrier";
instance.register(parentPath, 1);
Map<?, ?> countDownLatchMap = (Map<?, ?>)
Plugins.getMemberAccessor().get(PipelineDistributedBarrier.class.getDeclaredField("countDownLatchHolders"),
instance);
@@ -59,7 +59,7 @@ public final class PipelineDistributedBarrierTest {
String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
PersistRepository repository =
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
+ PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance();
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
instance.register(barrierEnablePath, 1);
instance.persistEphemeralChildrenNode(barrierEnablePath, 1);