This is an automated email from the ASF dual-hosted git repository.

kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b41cf2d  Replace CuratorZookeeperRepository to RegistryRepository with 
ResumeBreakPointManager (#7362)
b41cf2d is described below

commit b41cf2d9c3656748b05565e2700f6905d203a1b4
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Thu Sep 10 15:38:01 2020 +0800

    Replace CuratorZookeeperRepository to RegistryRepository with 
ResumeBreakPointManager (#7362)
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../src/main/resources/conf/server.yaml            |   8 ++
 .../scaling/core/config/ServerConfiguration.java   |   6 +-
 ...java => RepositoryResumeBreakPointManager.java} |  44 ++++++---
 .../resume/ResumeBreakPointManagerFactory.java     |   9 +-
 .../job/preparer/ShardingScalingJobPreparer.java   |   2 +-
 .../core/schedule/ScalingTaskScheduler.java        |  24 +++--
 .../RepositoryResumeBreakPointManagerTest.java     |  66 +++++++++++++
 .../ZookeeperResumeBreakPointManagerTest.java      | 107 ---------------------
 .../preparer/resumer/SyncPositionResumerTest.java  |   2 +-
 9 files changed, 130 insertions(+), 138 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
index 41b507c..ab992f4 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
@@ -19,3 +19,11 @@ port: 8888
 blockQueueSize: 10000
 pushTimeout: 1000
 workerThread: 30
+
+#resumeBreakPoint:
+#  name: scalingjob
+#  registryCenter:
+#    type: ZooKeeper
+#    serverLists: localhost:2181
+#    props:
+#      retryIntervalMilliseconds: 10000
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
index 255c1bb..f249a71 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.core.config;
 
 import lombok.Getter;
 import lombok.Setter;
+import 
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
-import 
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceCenterConfiguration;
 
 /**
  * Global server configuration.
@@ -37,7 +37,5 @@ public final class ServerConfiguration implements 
YamlConfiguration {
     
     private int workerThread = 30;
     
-    private String name;
-    
-    private YamlGovernanceCenterConfiguration registryCenter;
+    private YamlGovernanceConfiguration resumeBreakPoint;
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManager.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
similarity index 61%
rename from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManager.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
index 1698be9..12026f8 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManager.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
@@ -17,11 +17,14 @@
 
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
-import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceCenterConfiguration;
-import 
org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceCenterConfigurationYamlSwapper;
-import 
org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
+import 
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceConfiguration;
+import 
org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceConfigurationYamlSwapper;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import 
org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import 
org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
+import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 
 import java.util.concurrent.Executors;
@@ -29,16 +32,16 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Depends on zookeeper resume from break-point manager.
+ * Repository resume from break-point manager.
  */
 @Slf4j
-public final class ZookeeperResumeBreakPointManager extends 
AbstractResumeBreakPointManager {
+public final class RepositoryResumeBreakPointManager extends 
AbstractResumeBreakPointManager {
     
     private static final String INVENTORY = "/inventory";
     
     private static final String INCREMENTAL = "/incremental";
     
-    private static final CuratorZookeeperRepository 
CURATOR_ZOOKEEPER_REPOSITORY = new CuratorZookeeperRepository();
+    private static RegistryRepository registryRepository;
     
     private static boolean available;
     
@@ -49,16 +52,17 @@ public final class ZookeeperResumeBreakPointManager extends 
AbstractResumeBreakP
     private final String incrementalPath;
     
     static {
-        String name = 
ScalingContext.getInstance().getServerConfiguration().getName();
-        YamlGovernanceCenterConfiguration registryCenter = 
ScalingContext.getInstance().getServerConfiguration().getRegistryCenter();
-        if (!Strings.isNullOrEmpty(name) && null != registryCenter) {
-            CURATOR_ZOOKEEPER_REPOSITORY.init(name, new 
GovernanceCenterConfigurationYamlSwapper().swapToObject(registryCenter));
+        YamlGovernanceConfiguration resumeBreakPoint = 
ScalingContext.getInstance().getServerConfiguration().getResumeBreakPoint();
+        if (resumeBreakPoint != null) {
+            registryRepository = createRegistryRepository(new 
GovernanceConfigurationYamlSwapper().swapToObject(resumeBreakPoint));
+        }
+        if (registryRepository != null) {
             log.info("zookeeper resume from break-point manager is 
available.");
             available = true;
         }
     }
     
-    public ZookeeperResumeBreakPointManager(final String databaseType, final 
String taskPath) {
+    public RepositoryResumeBreakPointManager(final String databaseType, final 
String taskPath) {
         setDatabaseType(databaseType);
         setTaskPath(taskPath);
         inventoryPath = taskPath + INVENTORY;
@@ -69,6 +73,14 @@ public final class ZookeeperResumeBreakPointManager extends 
AbstractResumeBreakP
         executor.scheduleWithFixedDelay(this::persistPosition, 1, 1, 
TimeUnit.MINUTES);
     }
     
+    private static RegistryRepository createRegistryRepository(final 
GovernanceConfiguration config) {
+        GovernanceCenterConfiguration registryCenterConfig = 
config.getRegistryCenterConfiguration();
+        Preconditions.checkNotNull(registryCenterConfig, "Registry center 
configuration cannot be null.");
+        RegistryRepository result = 
TypedSPIRegistry.getRegisteredService(RegistryRepository.class, 
registryCenterConfig.getType(), registryCenterConfig.getProps());
+        result.init(config.getName(), registryCenterConfig);
+        return result;
+    }
+    
     /**
      * If it is available.
      *
@@ -85,8 +97,8 @@ public final class ZookeeperResumeBreakPointManager extends 
AbstractResumeBreakP
     }
     
     private void resumePosition() {
-        
resumeInventoryPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(inventoryPath));
-        
resumeIncrementalPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(incrementalPath));
+        resumeInventoryPosition(registryRepository.get(inventoryPath));
+        resumeIncrementalPosition(registryRepository.get(incrementalPath));
     }
     
     private void persistPosition() {
@@ -97,14 +109,14 @@ public final class ZookeeperResumeBreakPointManager 
extends AbstractResumeBreakP
     @Override
     public void persistInventoryPosition() {
         String result = getInventoryPositionData();
-        CURATOR_ZOOKEEPER_REPOSITORY.persist(inventoryPath, result);
+        registryRepository.persist(inventoryPath, result);
         log.info("persist inventory position {} = {}", inventoryPath, result);
     }
     
     @Override
     public void persistIncrementalPosition() {
         String result = getIncrementalPositionData();
-        CURATOR_ZOOKEEPER_REPOSITORY.persist(incrementalPath, result);
+        registryRepository.persist(incrementalPath, result);
         log.info("persist incremental position {} = {}", incrementalPath, 
result);
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
index 286e90e..e716bd7 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
@@ -20,6 +20,9 @@ package 
org.apache.shardingsphere.scaling.core.job.position.resume;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 
 /**
  * Resume from break-point manager factory.
@@ -30,8 +33,10 @@ public final class ResumeBreakPointManagerFactory {
     private static Class<? extends ResumeBreakPointManager> clazz = 
FakeResumeBreakPointManager.class;
     
     static {
-        if (ZookeeperResumeBreakPointManager.isAvailable()) {
-            clazz = ZookeeperResumeBreakPointManager.class;
+        ShardingSphereServiceLoader.register(RegistryRepository.class);
+        ShardingSphereServiceLoader.register(ConfigurationRepository.class);
+        if (RepositoryResumeBreakPointManager.isAvailable()) {
+            clazz = RepositoryResumeBreakPointManager.class;
         }
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 010f076..8181fc1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -80,7 +80,7 @@ public final class ShardingScalingJobPreparer {
     }
     
     private ResumeBreakPointManager getResumeBreakPointManager(final String 
databaseType, final ShardingScalingJob shardingScalingJob) {
-        return ResumeBreakPointManagerFactory.newInstance(databaseType, 
String.format("/%s/item-%d", shardingScalingJob.getJobName(), 
shardingScalingJob.getShardingItem()));
+        return ResumeBreakPointManagerFactory.newInstance(databaseType, 
String.format("/%s/position/%d", shardingScalingJob.getJobName(), 
shardingScalingJob.getShardingItem()));
     }
     
     private void checkDatasources(final String databaseType, final 
DataSourceManager dataSourceManager) {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index dd1a11a..fbc68a8 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -22,12 +22,16 @@ import 
org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import 
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import 
org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
+import 
org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
 
 import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -64,7 +68,7 @@ public final class ScalingTaskScheduler implements Runnable {
     public void run() {
         
shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
         ExecuteCallback inventoryDataTaskCallback = 
createInventoryDataTaskCallback();
-        if (shardingScalingJob.getInventoryDataTasks().isEmpty()) {
+        if (isFinished(shardingScalingJob.getInventoryDataTasks())) {
             executeIncrementalDataSyncTask();
             return;
         }
@@ -73,18 +77,24 @@ public final class ScalingTaskScheduler implements Runnable 
{
         }
     }
     
+    private boolean isFinished(final List<ScalingTask<InventoryPosition>> 
inventoryDataTasks) {
+        return inventoryDataTasks.stream().allMatch(each -> 
((InventoryDataScalingTaskGroup) 
each).getScalingTasks().stream().allMatch(getFinishPredicate()));
+    }
+    
+    private Predicate<ScalingTask<InventoryPosition>> getFinishPredicate() {
+        return each -> ((InventoryDataScalingTask) 
each).getPositionManager().getPosition() instanceof FinishedInventoryPosition;
+    }
+    
     private ExecuteCallback createInventoryDataTaskCallback() {
         return new ExecuteCallback() {
             
-            private final AtomicInteger finishedTaskNumber = new 
AtomicInteger(0);
-            
             @Override
             public void onSuccess() {
-                if (shardingScalingJob.getInventoryDataTasks().size() == 
finishedTaskNumber.incrementAndGet()) {
+                if (isFinished(shardingScalingJob.getInventoryDataTasks())) {
                     executeIncrementalDataSyncTask();
                 }
             }
-    
+            
             @Override
             public void onFailure(final Throwable throwable) {
                 stop();
@@ -112,7 +122,7 @@ public final class ScalingTaskScheduler implements Runnable 
{
             public void onSuccess() {
                 
shardingScalingJob.setStatus(SyncTaskControlStatus.STOPPED.name());
             }
-    
+            
             @Override
             public void onFailure(final Throwable throwable) {
                 stop();
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
new file mode 100644
index 0000000..f1d6f8b
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class RepositoryResumeBreakPointManagerTest {
+    
+    @Mock
+    private RegistryRepository registryRepository;
+    
+    private RepositoryResumeBreakPointManager 
repositoryResumeBreakPointManager;
+    
+    @Before
+    @SneakyThrows
+    public void setUp() {
+        ScalingContext.getInstance().init(new ServerConfiguration());
+        ReflectionUtil.setFieldValue(RepositoryResumeBreakPointManager.class, 
null, "registryRepository", registryRepository);
+        repositoryResumeBreakPointManager = new 
RepositoryResumeBreakPointManager("H2", "/base");
+    }
+    
+    @Test
+    public void assertPersistIncrementalPosition() {
+        repositoryResumeBreakPointManager.persistIncrementalPosition();
+        verify(registryRepository).persist("/base/incremental", "{}");
+    }
+    
+    @Test
+    public void assertPersistInventoryPosition() {
+        repositoryResumeBreakPointManager.persistInventoryPosition();
+        verify(registryRepository).persist("/base/inventory", 
"{\"unfinished\":{},\"finished\":[]}");
+    }
+    
+    @After
+    public void tearDown() {
+        repositoryResumeBreakPointManager.close();
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManagerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManagerTest.java
deleted file mode 100644
index d2a632d..0000000
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ExistsBuilder;
-import org.apache.curator.framework.api.GetDataBuilder;
-import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
-import org.apache.curator.framework.api.transaction.TransactionOp;
-import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
-import org.apache.curator.framework.imps.ExtractingCuratorOp;
-import 
org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
-import org.apache.zookeeper.data.Stat;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ZookeeperResumeBreakPointManagerTest {
-    
-    @Mock
-    private CuratorFramework client;
-    
-    @Mock
-    private GetDataBuilder getDataBuilder;
-    
-    @Mock
-    private CuratorMultiTransaction curatorMultiTransaction;
-    
-    @Mock
-    private ExistsBuilder existsBuilder;
-    
-    @Mock
-    private TransactionOp transactionOp;
-    
-    @Mock
-    private TransactionCheckBuilder<CuratorOp> transactionCheckBuilder;
-    
-    @Mock
-    private TransactionSetDataBuilder<CuratorOp> transactionSetDataBuilder;
-    
-    private ZookeeperResumeBreakPointManager zookeeperResumeBreakPointManager;
-    
-    @Before
-    @SneakyThrows
-    public void setUp() {
-        ScalingContext.getInstance().init(new ServerConfiguration());
-        CuratorZookeeperRepository curatorZookeeperRepository = 
ReflectionUtil.getStaticFieldValueFromClass(ZookeeperResumeBreakPointManager.class,
 "CURATOR_ZOOKEEPER_REPOSITORY");
-        ReflectionUtil.setFieldValue(curatorZookeeperRepository, "client", 
client);
-        when(client.getData()).thenReturn(getDataBuilder);
-        
when(getDataBuilder.forPath("/base/inventory")).thenReturn("{\"unfinished\":{},\"finished\":[]}".getBytes());
-        
when(getDataBuilder.forPath("/base/incremental")).thenReturn("{\"ds0\":{},\"ds1\":{}}".getBytes());
-        zookeeperResumeBreakPointManager = new 
ZookeeperResumeBreakPointManager("H2", "/base");
-    }
-    
-    @Test
-    @SneakyThrows
-    public void assertPersistPosition() {
-        CuratorOp curatorOp = new ExtractingCuratorOp();
-        when(client.checkExists()).thenReturn(existsBuilder);
-        when(existsBuilder.forPath("/base/inventory")).thenReturn(new Stat());
-        when(existsBuilder.forPath("/base/incremental")).thenReturn(new 
Stat());
-        when(client.transactionOp()).thenReturn(transactionOp);
-        when(transactionOp.check()).thenReturn(transactionCheckBuilder);
-        
when(transactionCheckBuilder.forPath(anyString())).thenReturn(curatorOp);
-        when(transactionOp.setData()).thenReturn(transactionSetDataBuilder);
-        when(transactionSetDataBuilder.forPath(anyString(), 
any(byte[].class))).thenReturn(curatorOp);
-        when(client.transaction()).thenReturn(curatorMultiTransaction);
-        ReflectionUtil.invokeMethod(zookeeperResumeBreakPointManager, 
"persistPosition");
-        verify(curatorMultiTransaction, times(2)).forOperations(curatorOp, 
curatorOp);
-    }
-    
-    @After
-    public void tearDown() {
-        zookeeperResumeBreakPointManager.close();
-    }
-}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index 44dfa00..97f89c5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -62,7 +62,7 @@ public final class SyncPositionResumerTest {
         ScalingContext.getInstance().init(new ServerConfiguration());
         shardingScalingJob = new ShardingScalingJob("scalingTest", 0);
         
shardingScalingJob.getSyncConfigurations().add(mockSyncConfiguration());
-        resumeBreakPointManager = 
ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/item-0");
+        resumeBreakPointManager = 
ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/position/0");
         syncPositionResumer = new SyncPositionResumer();
     }
     

Reply via email to