This is an automated email from the ASF dual-hosted git repository.
kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b41cf2d Replace CuratorZookeeperRepository to RegistryRepository with
ResumeBreakPointManager (#7362)
b41cf2d is described below
commit b41cf2d9c3656748b05565e2700f6905d203a1b4
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Thu Sep 10 15:38:01 2020 +0800
Replace CuratorZookeeperRepository to RegistryRepository with
ResumeBreakPointManager (#7362)
Co-authored-by: qiulu3 <Lucas209910>
---
.../src/main/resources/conf/server.yaml | 8 ++
.../scaling/core/config/ServerConfiguration.java | 6 +-
...java => RepositoryResumeBreakPointManager.java} | 44 ++++++---
.../resume/ResumeBreakPointManagerFactory.java | 9 +-
.../job/preparer/ShardingScalingJobPreparer.java | 2 +-
.../core/schedule/ScalingTaskScheduler.java | 24 +++--
.../RepositoryResumeBreakPointManagerTest.java | 66 +++++++++++++
.../ZookeeperResumeBreakPointManagerTest.java | 107 ---------------------
.../preparer/resumer/SyncPositionResumerTest.java | 2 +-
9 files changed, 130 insertions(+), 138 deletions(-)
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
index 41b507c..ab992f4 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
@@ -19,3 +19,11 @@ port: 8888
blockQueueSize: 10000
pushTimeout: 1000
workerThread: 30
+
+#resumeBreakPoint:
+# name: scalingjob
+# registryCenter:
+# type: ZooKeeper
+# serverLists: localhost:2181
+# props:
+# retryIntervalMilliseconds: 10000
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
index 255c1bb..f249a71 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
import lombok.Setter;
+import
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceConfiguration;
import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
-import
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceCenterConfiguration;
/**
* Global server configuration.
@@ -37,7 +37,5 @@ public final class ServerConfiguration implements
YamlConfiguration {
private int workerThread = 30;
- private String name;
-
- private YamlGovernanceCenterConfiguration registryCenter;
+ private YamlGovernanceConfiguration resumeBreakPoint;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManager.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
similarity index 61%
rename from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManager.java
rename to
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
index 1698be9..12026f8 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManager.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
-import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceCenterConfiguration;
-import
org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceCenterConfigurationYamlSwapper;
-import
org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
+import
org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceConfiguration;
+import
org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceConfigurationYamlSwapper;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import
org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import
org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
+import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import java.util.concurrent.Executors;
@@ -29,16 +32,16 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * Depends on zookeeper resume from break-point manager.
+ * Repository resume from break-point manager.
*/
@Slf4j
-public final class ZookeeperResumeBreakPointManager extends
AbstractResumeBreakPointManager {
+public final class RepositoryResumeBreakPointManager extends
AbstractResumeBreakPointManager {
private static final String INVENTORY = "/inventory";
private static final String INCREMENTAL = "/incremental";
- private static final CuratorZookeeperRepository
CURATOR_ZOOKEEPER_REPOSITORY = new CuratorZookeeperRepository();
+ private static RegistryRepository registryRepository;
private static boolean available;
@@ -49,16 +52,17 @@ public final class ZookeeperResumeBreakPointManager extends
AbstractResumeBreakP
private final String incrementalPath;
static {
- String name =
ScalingContext.getInstance().getServerConfiguration().getName();
- YamlGovernanceCenterConfiguration registryCenter =
ScalingContext.getInstance().getServerConfiguration().getRegistryCenter();
- if (!Strings.isNullOrEmpty(name) && null != registryCenter) {
- CURATOR_ZOOKEEPER_REPOSITORY.init(name, new
GovernanceCenterConfigurationYamlSwapper().swapToObject(registryCenter));
+ YamlGovernanceConfiguration resumeBreakPoint =
ScalingContext.getInstance().getServerConfiguration().getResumeBreakPoint();
+ if (resumeBreakPoint != null) {
+ registryRepository = createRegistryRepository(new
GovernanceConfigurationYamlSwapper().swapToObject(resumeBreakPoint));
+ }
+ if (registryRepository != null) {
log.info("zookeeper resume from break-point manager is
available.");
available = true;
}
}
- public ZookeeperResumeBreakPointManager(final String databaseType, final
String taskPath) {
+ public RepositoryResumeBreakPointManager(final String databaseType, final
String taskPath) {
setDatabaseType(databaseType);
setTaskPath(taskPath);
inventoryPath = taskPath + INVENTORY;
@@ -69,6 +73,14 @@ public final class ZookeeperResumeBreakPointManager extends
AbstractResumeBreakP
executor.scheduleWithFixedDelay(this::persistPosition, 1, 1,
TimeUnit.MINUTES);
}
+ private static RegistryRepository createRegistryRepository(final
GovernanceConfiguration config) {
+ GovernanceCenterConfiguration registryCenterConfig =
config.getRegistryCenterConfiguration();
+ Preconditions.checkNotNull(registryCenterConfig, "Registry center
configuration cannot be null.");
+ RegistryRepository result =
TypedSPIRegistry.getRegisteredService(RegistryRepository.class,
registryCenterConfig.getType(), registryCenterConfig.getProps());
+ result.init(config.getName(), registryCenterConfig);
+ return result;
+ }
+
/**
* If it is available.
*
@@ -85,8 +97,8 @@ public final class ZookeeperResumeBreakPointManager extends
AbstractResumeBreakP
}
private void resumePosition() {
-
resumeInventoryPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(inventoryPath));
-
resumeIncrementalPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(incrementalPath));
+ resumeInventoryPosition(registryRepository.get(inventoryPath));
+ resumeIncrementalPosition(registryRepository.get(incrementalPath));
}
private void persistPosition() {
@@ -97,14 +109,14 @@ public final class ZookeeperResumeBreakPointManager
extends AbstractResumeBreakP
@Override
public void persistInventoryPosition() {
String result = getInventoryPositionData();
- CURATOR_ZOOKEEPER_REPOSITORY.persist(inventoryPath, result);
+ registryRepository.persist(inventoryPath, result);
log.info("persist inventory position {} = {}", inventoryPath, result);
}
@Override
public void persistIncrementalPosition() {
String result = getIncrementalPositionData();
- CURATOR_ZOOKEEPER_REPOSITORY.persist(incrementalPath, result);
+ registryRepository.persist(incrementalPath, result);
log.info("persist incremental position {} = {}", incrementalPath,
result);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
index 286e90e..e716bd7 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
@@ -20,6 +20,9 @@ package
org.apache.shardingsphere.scaling.core.job.position.resume;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
+import
org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
/**
* Resume from break-point manager factory.
@@ -30,8 +33,10 @@ public final class ResumeBreakPointManagerFactory {
private static Class<? extends ResumeBreakPointManager> clazz =
FakeResumeBreakPointManager.class;
static {
- if (ZookeeperResumeBreakPointManager.isAvailable()) {
- clazz = ZookeeperResumeBreakPointManager.class;
+ ShardingSphereServiceLoader.register(RegistryRepository.class);
+ ShardingSphereServiceLoader.register(ConfigurationRepository.class);
+ if (RepositoryResumeBreakPointManager.isAvailable()) {
+ clazz = RepositoryResumeBreakPointManager.class;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 010f076..8181fc1 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -80,7 +80,7 @@ public final class ShardingScalingJobPreparer {
}
private ResumeBreakPointManager getResumeBreakPointManager(final String
databaseType, final ShardingScalingJob shardingScalingJob) {
- return ResumeBreakPointManagerFactory.newInstance(databaseType,
String.format("/%s/item-%d", shardingScalingJob.getJobName(),
shardingScalingJob.getShardingItem()));
+ return ResumeBreakPointManagerFactory.newInstance(databaseType,
String.format("/%s/position/%d", shardingScalingJob.getJobName(),
shardingScalingJob.getShardingItem()));
}
private void checkDatasources(final String databaseType, final
DataSourceManager dataSourceManager) {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index dd1a11a..fbc68a8 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -22,12 +22,16 @@ import
org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import
org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
+import
org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -64,7 +68,7 @@ public final class ScalingTaskScheduler implements Runnable {
public void run() {
shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
ExecuteCallback inventoryDataTaskCallback =
createInventoryDataTaskCallback();
- if (shardingScalingJob.getInventoryDataTasks().isEmpty()) {
+ if (isFinished(shardingScalingJob.getInventoryDataTasks())) {
executeIncrementalDataSyncTask();
return;
}
@@ -73,18 +77,24 @@ public final class ScalingTaskScheduler implements Runnable
{
}
}
+ private boolean isFinished(final List<ScalingTask<InventoryPosition>>
inventoryDataTasks) {
+ return inventoryDataTasks.stream().allMatch(each ->
((InventoryDataScalingTaskGroup)
each).getScalingTasks().stream().allMatch(getFinishPredicate()));
+ }
+
+ private Predicate<ScalingTask<InventoryPosition>> getFinishPredicate() {
+ return each -> ((InventoryDataScalingTask)
each).getPositionManager().getPosition() instanceof FinishedInventoryPosition;
+ }
+
private ExecuteCallback createInventoryDataTaskCallback() {
return new ExecuteCallback() {
- private final AtomicInteger finishedTaskNumber = new
AtomicInteger(0);
-
@Override
public void onSuccess() {
- if (shardingScalingJob.getInventoryDataTasks().size() ==
finishedTaskNumber.incrementAndGet()) {
+ if (isFinished(shardingScalingJob.getInventoryDataTasks())) {
executeIncrementalDataSyncTask();
}
}
-
+
@Override
public void onFailure(final Throwable throwable) {
stop();
@@ -112,7 +122,7 @@ public final class ScalingTaskScheduler implements Runnable
{
public void onSuccess() {
shardingScalingJob.setStatus(SyncTaskControlStatus.STOPPED.name());
}
-
+
@Override
public void onFailure(final Throwable throwable) {
stop();
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
new file mode 100644
index 0000000..f1d6f8b
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class RepositoryResumeBreakPointManagerTest {
+
+ @Mock
+ private RegistryRepository registryRepository;
+
+ private RepositoryResumeBreakPointManager
repositoryResumeBreakPointManager;
+
+ @Before
+ @SneakyThrows
+ public void setUp() {
+ ScalingContext.getInstance().init(new ServerConfiguration());
+ ReflectionUtil.setFieldValue(RepositoryResumeBreakPointManager.class,
null, "registryRepository", registryRepository);
+ repositoryResumeBreakPointManager = new
RepositoryResumeBreakPointManager("H2", "/base");
+ }
+
+ @Test
+ public void assertPersistIncrementalPosition() {
+ repositoryResumeBreakPointManager.persistIncrementalPosition();
+ verify(registryRepository).persist("/base/incremental", "{}");
+ }
+
+ @Test
+ public void assertPersistInventoryPosition() {
+ repositoryResumeBreakPointManager.persistInventoryPosition();
+ verify(registryRepository).persist("/base/inventory",
"{\"unfinished\":{},\"finished\":[]}");
+ }
+
+ @After
+ public void tearDown() {
+ repositoryResumeBreakPointManager.close();
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManagerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManagerTest.java
deleted file mode 100644
index d2a632d..0000000
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ExistsBuilder;
-import org.apache.curator.framework.api.GetDataBuilder;
-import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
-import org.apache.curator.framework.api.transaction.TransactionOp;
-import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
-import org.apache.curator.framework.imps.ExtractingCuratorOp;
-import
org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
-import org.apache.zookeeper.data.Stat;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ZookeeperResumeBreakPointManagerTest {
-
- @Mock
- private CuratorFramework client;
-
- @Mock
- private GetDataBuilder getDataBuilder;
-
- @Mock
- private CuratorMultiTransaction curatorMultiTransaction;
-
- @Mock
- private ExistsBuilder existsBuilder;
-
- @Mock
- private TransactionOp transactionOp;
-
- @Mock
- private TransactionCheckBuilder<CuratorOp> transactionCheckBuilder;
-
- @Mock
- private TransactionSetDataBuilder<CuratorOp> transactionSetDataBuilder;
-
- private ZookeeperResumeBreakPointManager zookeeperResumeBreakPointManager;
-
- @Before
- @SneakyThrows
- public void setUp() {
- ScalingContext.getInstance().init(new ServerConfiguration());
- CuratorZookeeperRepository curatorZookeeperRepository =
ReflectionUtil.getStaticFieldValueFromClass(ZookeeperResumeBreakPointManager.class,
"CURATOR_ZOOKEEPER_REPOSITORY");
- ReflectionUtil.setFieldValue(curatorZookeeperRepository, "client",
client);
- when(client.getData()).thenReturn(getDataBuilder);
-
when(getDataBuilder.forPath("/base/inventory")).thenReturn("{\"unfinished\":{},\"finished\":[]}".getBytes());
-
when(getDataBuilder.forPath("/base/incremental")).thenReturn("{\"ds0\":{},\"ds1\":{}}".getBytes());
- zookeeperResumeBreakPointManager = new
ZookeeperResumeBreakPointManager("H2", "/base");
- }
-
- @Test
- @SneakyThrows
- public void assertPersistPosition() {
- CuratorOp curatorOp = new ExtractingCuratorOp();
- when(client.checkExists()).thenReturn(existsBuilder);
- when(existsBuilder.forPath("/base/inventory")).thenReturn(new Stat());
- when(existsBuilder.forPath("/base/incremental")).thenReturn(new
Stat());
- when(client.transactionOp()).thenReturn(transactionOp);
- when(transactionOp.check()).thenReturn(transactionCheckBuilder);
-
when(transactionCheckBuilder.forPath(anyString())).thenReturn(curatorOp);
- when(transactionOp.setData()).thenReturn(transactionSetDataBuilder);
- when(transactionSetDataBuilder.forPath(anyString(),
any(byte[].class))).thenReturn(curatorOp);
- when(client.transaction()).thenReturn(curatorMultiTransaction);
- ReflectionUtil.invokeMethod(zookeeperResumeBreakPointManager,
"persistPosition");
- verify(curatorMultiTransaction, times(2)).forOperations(curatorOp,
curatorOp);
- }
-
- @After
- public void tearDown() {
- zookeeperResumeBreakPointManager.close();
- }
-}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index 44dfa00..97f89c5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -62,7 +62,7 @@ public final class SyncPositionResumerTest {
ScalingContext.getInstance().init(new ServerConfiguration());
shardingScalingJob = new ShardingScalingJob("scalingTest", 0);
shardingScalingJob.getSyncConfigurations().add(mockSyncConfiguration());
- resumeBreakPointManager =
ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/item-0");
+ resumeBreakPointManager =
ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/position/0");
syncPositionResumer = new SyncPositionResumer();
}