This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dbeb61eccb Refactor PipelineDistributedBarrier as singleton (#23908)
2dbeb61eccb is described below

commit 2dbeb61eccb0c795a469f44633032e6412ef4620
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Feb 2 10:16:10 2023 +0800

    Refactor PipelineDistributedBarrier as singleton (#23908)
---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  4 ++--
 .../pipeline/core/job/AbstractPipelineJob.java     |  2 +-
 .../impl/BarrierMetaDataChangedEventHandler.java   |  2 +-
 .../core/util/PipelineDistributedBarrier.java      | 14 +++++++++++
 ...tencyCheckChangedJobConfigurationProcessor.java |  3 +--
 .../MigrationChangedJobConfigurationProcessor.java |  3 +--
 .../core/api/impl/MigrationJobAPITest.java         | 28 +++++++++++++++++-----
 .../core/util/PipelineDistributedBarrierTest.java  |  4 ++--
 8 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3dc039a6a0f..f774125c855 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -117,7 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     @Override
     public void startDisabledJob(final String jobId) {
-        PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
+        PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineJobHasAlreadyStartedException(jobId));
@@ -133,7 +133,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     @Override
     public void stop(final String jobId) {
-        PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
+        PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 958fba3c4ae..3b146248b76 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -100,7 +100,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
             return false;
         }
         
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
-        new 
PipelineDistributedBarrier().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
+        
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
         return true;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index 36591f27f68..8752b3b3170 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -38,7 +38,7 @@ public final class BarrierMetaDataChangedEventHandler 
implements PipelineMetaDat
     @Override
     public void handle(final DataChangedEvent event) {
         if (event.getType() == Type.ADDED) {
-            new 
PipelineDistributedBarrier().notifyChildrenNodeCountCheck(event.getKey());
+            
PipelineDistributedBarrier.getInstance().notifyChildrenNodeCountCheck(event.getKey());
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 2ecfd906199..6d3e050d903 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -18,7 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.core.util;
 
 import com.google.common.base.Strings;
+import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -36,8 +38,11 @@ import java.util.concurrent.TimeUnit;
  * Pipeline distributed barrier.
  */
 @Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineDistributedBarrier {
     
+    private static final PipelineDistributedBarrier INSTANCE = new 
PipelineDistributedBarrier();
+    
     private static final LazyInitializer<ClusterPersistRepository> 
REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
         
         @Override
@@ -48,6 +53,15 @@ public final class PipelineDistributedBarrier {
     
     private final Map<String, InnerCountDownLatchHolder> countDownLatchHolders 
= new ConcurrentHashMap<>();
     
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static PipelineDistributedBarrier getInstance() {
+        return INSTANCE;
+    }
+    
     @SneakyThrows(ConcurrentException.class)
     private static ClusterPersistRepository getRepository() {
         return REPOSITORY_LAZY_INITIALIZER.get();
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 7faf007c6ee..016520a2cbe 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -45,9 +45,8 @@ public final class 
ConsistencyCheckChangedJobConfigurationProcessor implements P
         if (jobConfig.isDisabled()) {
             Collection<Integer> shardingItems = 
PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
-            PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
             for (Integer each : shardingItems) {
-                
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 each);
+                
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 each);
             }
             return;
         }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index d9181fcdb18..5150befc687 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -47,9 +47,8 @@ public final class MigrationChangedJobConfigurationProcessor 
implements Pipeline
         if (jobConfig.isDisabled()) {
             Collection<Integer> shardingItems = 
PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
-            PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
             for (Integer each : shardingItems) {
-                
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 each);
+                
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 each);
             }
             return;
         }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
index d00b4d66417..457f8496d07 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -48,6 +49,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
@@ -69,6 +71,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class MigrationJobAPITest {
@@ -111,10 +115,14 @@ public final class MigrationJobAPITest {
         Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
-        jobAPI.stop(jobId.get());
-        assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
-        jobAPI.startDisabledJob(jobId.get());
-        assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
+        PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
+        try (MockedStatic<PipelineDistributedBarrier> distributedBarrierMock = 
mockStatic(PipelineDistributedBarrier.class)) {
+            
distributedBarrierMock.when(PipelineDistributedBarrier::getInstance).thenReturn(mockBarrier);
+            jobAPI.stop(jobId.get());
+            assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
+            jobAPI.startDisabledJob(jobId.get());
+            assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
+        }
     }
     
     @Test
@@ -123,7 +131,11 @@ public final class MigrationJobAPITest {
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
-        jobAPI.rollback(jobId.get());
+        PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
+        try (MockedStatic<PipelineDistributedBarrier> distributedBarrierMock = 
mockStatic(PipelineDistributedBarrier.class)) {
+            
distributedBarrierMock.when(PipelineDistributedBarrier::getInstance).thenReturn(mockBarrier);
+            jobAPI.rollback(jobId.get());
+        }
         assertNull(getJobConfigurationPOJO(jobId.get()));
     }
     
@@ -133,7 +145,11 @@ public final class MigrationJobAPITest {
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
-        jobAPI.commit(jobId.get());
+        PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
+        try (MockedStatic<PipelineDistributedBarrier> distributedBarrierMock = 
mockStatic(PipelineDistributedBarrier.class)) {
+            
distributedBarrierMock.when(PipelineDistributedBarrier::getInstance).thenReturn(mockBarrier);
+            jobAPI.commit(jobId.get());
+        }
         assertNull(getJobConfigurationPOJO(jobId.get()));
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 8cf105b9e43..858d260058c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -44,7 +44,7 @@ public final class PipelineDistributedBarrierTest {
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
+        PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance();
         String parentPath = "/barrier";
         instance.register(parentPath, 1);
         Map<?, ?> countDownLatchMap = (Map<?, ?>) 
Plugins.getMemberAccessor().get(PipelineDistributedBarrier.class.getDeclaredField("countDownLatchHolders"),
 instance);
@@ -59,7 +59,7 @@ public final class PipelineDistributedBarrierTest {
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
+        PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance();
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         instance.register(barrierEnablePath, 1);
         instance.persistEphemeralChildrenNode(barrierEnablePath, 1);

Reply via email to