This is an automated email from the ASF dual-hosted git repository.
soulasuna pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d29fa58bc33 Adjustment governance center watch interface (#21415)
d29fa58bc33 is described below
commit d29fa58bc3324ad3fc9c51d1e87323c4572901fd
Author: zhaojinchao <[email protected]>
AuthorDate: Sun Oct 9 16:31:08 2022 +0800
Adjustment governance center watch interface (#21415)
* Adjustment governance center watch interface
* Add getDirectly interface
* Fix CI
* Fix unit test
* Fix ci
---
.../fixture/FixtureClusterPersistRepository.java | 13 ++-
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 16 ++--
.../service/MetaDataVersionPersistService.java | 6 +-
.../config/database/DataSourcePersistService.java | 8 +-
.../database/DatabaseRulePersistService.java | 10 +-
.../config/global/GlobalRulePersistService.java | 4 +-
.../config/global/PropertiesPersistService.java | 4 +-
.../schema/TableMetaDataPersistService.java | 2 +-
.../service/schema/ViewMetaDataPersistService.java | 2 +-
.../mode/persist/PersistRepository.java | 8 ++
.../DatabaseMetaDataPersistServiceTest.java | 2 +-
.../service/MetaDataVersionPersistServiceTest.java | 2 +-
.../database/DataSourcePersistServiceTest.java | 8 +-
.../database/DatabaseRulePersistServiceTest.java | 8 +-
.../global/GlobalRulePersistServiceTest.java | 4 +-
.../global/PropertiesPersistServiceTest.java | 2 +-
.../schema/TableMetaDataPersistServiceTest.java | 2 +-
.../schema/ViewMetaDataPersistServiceTest.java | 2 +-
.../registry/GovernanceWatcherFactory.java | 2 +-
.../compute/service/ComputeNodeStatusService.java | 10 +-
.../subscriber/ComputeNodeStatusSubscriber.java | 2 +-
.../storage/service/StorageNodeStatusService.java | 2 +-
.../subscriber/ProcessRegistrySubscriber.java | 4 +-
.../fixture/ClusterPersistRepositoryFixture.java | 20 +++-
...ProcessListClusterPersistRepositoryFixture.java | 20 +++-
.../subscriber/ProcessRegistrySubscriberTest.java | 2 +-
.../service/ComputeNodeStatusServiceTest.java | 8 +-
.../cluster/ClusterPersistRepository.java | 12 ++-
.../cluster/consul/ConsulRepository.java | 14 ++-
.../cluster/consul/ConsulRepositoryTest.java | 8 +-
.../repository/cluster/etcd/EtcdRepository.java | 16 +++-
.../cluster/etcd/EtcdRepositoryTest.java | 15 ++-
.../repository/cluster/nacos/NacosRepository.java | 14 ++-
.../cluster/nacos/NacosRepositoryTest.java | 8 +-
.../zookeeper/CuratorZookeeperRepository.java | 106 ++++++++++++++++++---
.../zookeeper/CuratorZookeeperRepositoryTest.java | 6 +-
.../StandaloneContextManagerBuilderTextTest.java | 6 +-
.../StandalonePersistRepositoryFixture.java | 5 +
.../repository/standalone/jdbc/JDBCRepository.java | 10 +-
.../standalone/jdbc/JDBCRepositoryTest.java | 4 +-
.../standalone/h2/H2JDBCRepositoryTest.java | 6 +-
.../fixture/ClusterPersistRepositoryFixture.java | 13 ++-
.../fixture/TestClusterPersistRepository.java | 20 +++-
.../pipeline/framework/watcher/ScalingWatcher.java | 2 +-
44 files changed, 316 insertions(+), 122 deletions(-)
diff --git
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index d2d0c720541..268a4fc4510 100644
---
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
public final class FixtureClusterPersistRepository implements
ClusterPersistRepository {
@@ -64,8 +65,18 @@ public final class FixtureClusterPersistRepository
implements ClusterPersistRepo
public void executeInTransaction(final List<TransactionOperation>
transactionOperations) {
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+
+ }
+
@Override
public String get(final String key) {
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
return registryData.get(key);
}
@@ -112,7 +123,7 @@ public final class FixtureClusterPersistRepository
implements ClusterPersistRepo
}
@Override
- public void watch(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index f7bad40544e..86ce2c90139 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -51,7 +51,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public boolean isExisted(final String key) {
- return null != repository.get(key);
+ return null != repository.getDirectly(key);
}
@Override
@@ -61,12 +61,12 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public String getJobItemProgress(final String jobId, final int
shardingItem) {
- return repository.get(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
+ return
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
}
@Override
public Optional<String> getCheckLatestJobId(final String jobId) {
- return
Optional.ofNullable(repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId)));
+ return
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId)));
}
@Override
@@ -79,7 +79,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public Map<String, DataConsistencyCheckResult> getCheckJobResult(final
String jobId, final String checkJobId) {
Map<String, DataConsistencyCheckResult> result = new HashMap<>();
- String yamlCheckResultMapText =
repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+ String yamlCheckResultMapText =
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(jobId,
checkJobId));
if (StringUtils.isBlank(yamlCheckResultMapText)) {
return Collections.emptyMap();
}
@@ -130,7 +130,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
- repository.watch(key, listener);
+ repository.watch(key, listener, null);
}
@Override
@@ -147,7 +147,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public String getMetaDataDataSources(final JobType jobType) {
- return
repository.get(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
+ return
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
}
@Override
@@ -157,7 +157,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public String getMetaDataProcessConfiguration(final JobType jobType) {
- return
repository.get(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
+ return
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
}
@Override
@@ -167,7 +167,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
- return
repository.get(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
+ return
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
}
@Override
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
index 4a507f7112e..0d87bd3b784 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
@@ -39,7 +39,7 @@ public final class MetaDataVersionPersistService {
* @return active database version
*/
public Optional<String> getActiveVersion(final String databaseName) {
- return
Optional.ofNullable(repository.get(DatabaseMetaDataNode.getActiveVersionPath(databaseName)));
+ return
Optional.ofNullable(repository.getDirectly(DatabaseMetaDataNode.getActiveVersionPath(databaseName)));
}
/**
@@ -66,9 +66,9 @@ public final class MetaDataVersionPersistService {
return Optional.empty();
}
String newVersion = String.valueOf(new
AtomicLong(Long.parseLong(activeVersion.get())).incrementAndGet());
- repository.persist(DatabaseMetaDataNode.getRulePath(databaseName,
newVersion), repository.get(DatabaseMetaDataNode.getRulePath(databaseName,
activeVersion.get())));
+ repository.persist(DatabaseMetaDataNode.getRulePath(databaseName,
newVersion),
repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName,
activeVersion.get())));
repository.persist(
- DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
newVersion),
repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
activeVersion.get())));
+ DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
newVersion),
repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
activeVersion.get())));
return Optional.of(newVersion);
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
index 1e1a39bc3e6..90cab74797d 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
@@ -68,13 +68,13 @@ public final class DataSourcePersistService implements
DatabaseBasedPersistServi
@Override
public Map<String, DataSourceProperties> load(final String databaseName) {
- return isExisted(databaseName) ?
getDataSourceProperties(repository.get(
+ return isExisted(databaseName) ?
getDataSourceProperties(repository.getDirectly(
DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
getDatabaseActiveVersion(databaseName)))) : new LinkedHashMap<>();
}
@Override
public Map<String, DataSourceProperties> load(final String databaseName,
final String version) {
- String yamlContent =
repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
version));
+ String yamlContent =
repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
version));
return Strings.isNullOrEmpty(yamlContent) ? new LinkedHashMap<>() :
getDataSourceProperties(yamlContent);
}
@@ -91,7 +91,7 @@ public final class DataSourcePersistService implements
DatabaseBasedPersistServi
@Override
public boolean isExisted(final String databaseName) {
- return !Strings.isNullOrEmpty(getDatabaseActiveVersion(databaseName))
&&
!Strings.isNullOrEmpty(repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
+ return !Strings.isNullOrEmpty(getDatabaseActiveVersion(databaseName))
&&
!Strings.isNullOrEmpty(repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
getDatabaseActiveVersion(databaseName))));
}
@@ -108,6 +108,6 @@ public final class DataSourcePersistService implements
DatabaseBasedPersistServi
}
private String getDatabaseActiveVersion(final String databaseName) {
- return
repository.get(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
+ return
repository.getDirectly(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
index fba1c9839b0..53223d7c5d2 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
@@ -68,7 +68,7 @@ public final class DatabaseRulePersistService implements
DatabaseBasedPersistSer
public Collection<RuleConfiguration> load(final String databaseName) {
return isExisted(databaseName)
// TODO process algorithm provided configuration
- ? new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(DatabaseMetaDataNode.getRulePath(databaseName,
+ ? new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName,
getDatabaseActiveVersion(databaseName))),
Collection.class, true))
: new LinkedList<>();
}
@@ -76,19 +76,19 @@ public final class DatabaseRulePersistService implements
DatabaseBasedPersistSer
@SuppressWarnings("unchecked")
@Override
public Collection<RuleConfiguration> load(final String databaseName, final
String version) {
- String yamlContent =
repository.get(DatabaseMetaDataNode.getRulePath(databaseName, version));
+ String yamlContent =
repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName, version));
return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>()
- : new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(DatabaseMetaDataNode
+ : new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.getDirectly(DatabaseMetaDataNode
.getRulePath(databaseName,
getDatabaseActiveVersion(databaseName))), Collection.class, true));
}
@Override
public boolean isExisted(final String databaseName) {
return !Strings.isNullOrEmpty(getDatabaseActiveVersion(databaseName))
- &&
!Strings.isNullOrEmpty(repository.get(DatabaseMetaDataNode.getRulePath(databaseName,
getDatabaseActiveVersion(databaseName))));
+ &&
!Strings.isNullOrEmpty(repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName,
getDatabaseActiveVersion(databaseName))));
}
private String getDatabaseActiveVersion(final String databaseName) {
- return
repository.get(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
+ return
repository.getDirectly(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
index 9987d18d42b..bf10bd08a62 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
@@ -55,12 +55,12 @@ public final class GlobalRulePersistService implements
GlobalPersistService<Coll
@SuppressWarnings("unchecked")
public Collection<RuleConfiguration> load() {
return isExisted()
- ? new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(GlobalNode.getGlobalRuleNode()),
Collection.class))
+ ? new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.getDirectly(GlobalNode.getGlobalRuleNode()),
Collection.class))
: Collections.emptyList();
}
private boolean isExisted() {
- return
!Strings.isNullOrEmpty(repository.get(GlobalNode.getGlobalRuleNode()));
+ return
!Strings.isNullOrEmpty(repository.getDirectly(GlobalNode.getGlobalRuleNode()));
}
/**
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
index a041bd87550..776e85a9667 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
@@ -46,11 +46,11 @@ public final class PropertiesPersistService implements
GlobalPersistService<Prop
}
private boolean isExisted() {
- return
!Strings.isNullOrEmpty(repository.get(GlobalNode.getPropsPath()));
+ return
!Strings.isNullOrEmpty(repository.getDirectly(GlobalNode.getPropsPath()));
}
@Override
public Properties load() {
- return
Strings.isNullOrEmpty(repository.get(GlobalNode.getPropsPath())) ? new
Properties() : YamlEngine.unmarshal(repository.get(GlobalNode.getPropsPath()),
Properties.class);
+ return
Strings.isNullOrEmpty(repository.getDirectly(GlobalNode.getPropsPath())) ? new
Properties() :
YamlEngine.unmarshal(repository.getDirectly(GlobalNode.getPropsPath()),
Properties.class);
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
index f1b0a4030ca..d1f5a814cb5 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
@@ -59,7 +59,7 @@ public final class TableMetaDataPersistService implements
SchemaMetaDataPersistS
private Map<String, ShardingSphereTable>
getTableMetaDataByTableNames(final String databaseName, final String
schemaName, final Collection<String> tableNames) {
Map<String, ShardingSphereTable> result = new
LinkedHashMap<>(tableNames.size(), 1);
tableNames.forEach(each -> {
- String table =
repository.get(DatabaseMetaDataNode.getTableMetaDataPath(databaseName,
schemaName, each));
+ String table =
repository.getDirectly(DatabaseMetaDataNode.getTableMetaDataPath(databaseName,
schemaName, each));
if (!StringUtils.isEmpty(table)) {
result.put(each.toLowerCase(), new
YamlTableSwapper().swapToObject(YamlEngine.unmarshal(table,
YamlShardingSphereTable.class)));
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
index 65adcf86376..d89c8386a71 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
@@ -59,7 +59,7 @@ public final class ViewMetaDataPersistService implements
SchemaMetaDataPersistSe
private Map<String, ShardingSphereView> getViewMetaDataByViewNames(final
String databaseName, final String schemaName, final Collection<String>
viewNames) {
Map<String, ShardingSphereView> result = new
LinkedHashMap<>(viewNames.size(), 1);
viewNames.forEach(each -> {
- String view =
repository.get(DatabaseMetaDataNode.getViewMetaDataPath(databaseName,
schemaName, each));
+ String view =
repository.getDirectly(DatabaseMetaDataNode.getViewMetaDataPath(databaseName,
schemaName, each));
if (!StringUtils.isEmpty(view)) {
result.put(each.toLowerCase(), new
YamlViewSwapper().swapToObject(YamlEngine.unmarshal(view,
YamlShardingSphereView.class)));
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
index 48c00e5559b..76e31f226cc 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
@@ -41,6 +41,14 @@ public interface PersistRepository extends TypedSPI {
*/
String get(String key);
+ /**
+ * Get value from registry center directly.
+ *
+ * @param key key
+ * @return value
+ */
+ String getDirectly(String key);
+
/**
* Get names of sub-node.
*
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
index c2a95160e2b..5ff43b6efc7 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
@@ -120,7 +120,7 @@ public final class DatabaseMetaDataPersistServiceTest {
DatabaseMetaDataPersistService databaseMetaDataPersistService = new
DatabaseMetaDataPersistService(repository);
when(repository.getChildrenKeys("/metadata/foo_db/schemas")).thenReturn(Collections.singletonList("foo_schema"));
when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
-
when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+
when(repository.getDirectly("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
Map<String, ShardingSphereSchema> schema =
databaseMetaDataPersistService.loadSchemas("foo_db");
assertThat(schema.size(), is(1));
Map<String, ShardingSphereSchema> empty =
databaseMetaDataPersistService.loadSchemas("test");
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
index 7908c7b2993..f82dff1cb13 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
@@ -43,7 +43,7 @@ public final class MetaDataVersionPersistServiceTest {
@Before
public void setUp() {
repository = mock(PersistRepository.class);
- when(repository.get(contains("foo_db"))).thenReturn("1");
+ when(repository.getDirectly(contains("foo_db"))).thenReturn("1");
metaDataVersionPersistService = new
MetaDataVersionPersistService(repository);
}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
index 93d3f58e1cc..83e3c5b981a 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
@@ -51,8 +51,8 @@ public final class DataSourcePersistServiceTest {
@Test
public void assertLoad() {
-
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
-
when(repository.get("/metadata/foo_db/versions/0/dataSources")).thenReturn(readDataSourceYaml("yaml/persist/data-source.yaml"));
+
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
+
when(repository.getDirectly("/metadata/foo_db/versions/0/dataSources")).thenReturn(readDataSourceYaml("yaml/persist/data-source.yaml"));
Map<String, DataSourceProperties> actual = new
DataSourcePersistService(repository).load("foo_db");
assertThat(actual.size(), is(2));
assertDataSourceProperties(actual.get("ds_0"),
DataSourcePropertiesCreator.create(createDataSource("ds_0")));
@@ -75,14 +75,14 @@ public final class DataSourcePersistServiceTest {
@Test
public void assertLoadWithoutPath() {
-
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
+
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
Map<String, DataSourceProperties> actual = new
DataSourcePersistService(repository).load("foo_db");
assertTrue(actual.isEmpty());
}
@Test
public void assertAppend() {
-
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
+
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
new DataSourcePersistService(repository).append("foo_db",
Collections.singletonMap("foo_ds",
DataSourcePropertiesCreator.create(createDataSource("foo_ds"))));
String expected =
readDataSourceYaml("yaml/persist/data-source-foo.yaml");
verify(repository).persist("/metadata/foo_db/versions/0/dataSources",
expected);
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
index a6708327280..7c554e026d3 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
@@ -51,16 +51,16 @@ public final class DatabaseRulePersistServiceTest {
@Test
public void assertLoadWithExistedNode() {
-
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
-
when(repository.get("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
+
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
+
when(repository.getDirectly("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
Collection<RuleConfiguration> actual = new
DatabaseRulePersistService(repository).load("foo_db");
assertThat(actual.size(), is(1));
}
@Test
public void assertIsExisted() {
-
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
-
when(repository.get("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
+
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
+
when(repository.getDirectly("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
DatabaseRulePersistService databaseRulePersistService = new
DatabaseRulePersistService(repository);
assertTrue(databaseRulePersistService.isExisted("foo_db"));
assertFalse(databaseRulePersistService.isExisted("foo_db_1"));
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
index 29aa0296d0a..79b827ca69b 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
@@ -45,14 +45,14 @@ public class GlobalRulePersistServiceTest {
@Test
public void assertLoad() {
- when(repository.get("/rules")).thenReturn(readYAML());
+ when(repository.getDirectly("/rules")).thenReturn(readYAML());
Collection<RuleConfiguration> actual = new
GlobalRulePersistService(repository).load();
assertThat(actual.size(), is(1));
}
@Test
public void assertLoadUsers() {
- when(repository.get("/rules")).thenReturn(readYAML());
+ when(repository.getDirectly("/rules")).thenReturn(readYAML());
Collection<ShardingSphereUser> actual = new
GlobalRulePersistService(repository).loadUsers();
assertThat(actual.size(), is(2));
}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
index 11faf56a446..8a798335f8a 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
@@ -40,7 +40,7 @@ public final class PropertiesPersistServiceTest {
@Test
public void assertLoad() {
- when(repository.get("/props")).thenReturn(PROPS_YAML);
+ when(repository.getDirectly("/props")).thenReturn(PROPS_YAML);
Properties actual = new PropertiesPersistService(repository).load();
assertThat(actual.get(ConfigurationPropertyKey.SQL_SHOW.getKey()),
is(Boolean.FALSE));
}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
index c3c976a2e54..1445bb82f88 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
@@ -56,7 +56,7 @@ public final class TableMetaDataPersistServiceTest {
public void assertLoad() {
TableMetaDataPersistService tableMetaDataPersistService = new
TableMetaDataPersistService(repository);
when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
-
when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+
when(repository.getDirectly("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
Map<String, ShardingSphereTable> tables =
tableMetaDataPersistService.load("foo_db", "foo_schema");
assertThat(tables.size(), is(1));
assertThat(tables.get("t_order").getIndexes().keySet(),
is(Collections.singleton("primary")));
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
index dbf60af2eb7..10d85d7e5a1 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
@@ -58,7 +58,7 @@ public final class ViewMetaDataPersistServiceTest {
public void assertLoad() {
ViewMetaDataPersistService viewMetaDataPersistService = new
ViewMetaDataPersistService(repository);
when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/views")).thenReturn(Collections.singletonList("foo_view"));
-
when(repository.get("/metadata/foo_db/schemas/foo_schema/views/foo_view")).thenReturn(readYAML());
+
when(repository.getDirectly("/metadata/foo_db/schemas/foo_schema/views/foo_view")).thenReturn(readYAML());
Map<String, ShardingSphereView> views =
viewMetaDataPersistService.load("foo_db", "foo_schema");
assertThat(views.size(), is(1));
assertThat(views.get("foo_view").getName(), is("foo_view"));
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index 01f5c6f3991..463915a9f16 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -58,6 +58,6 @@ public final class GovernanceWatcherFactory {
if
(listener.getWatchingTypes().contains(dataChangedEventListener.getType())) {
listener.createGovernanceEvent(dataChangedEventListener).ifPresent(eventBusContext::post);
}
- });
+ }, null);
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index e7178474134..6ad59749599 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -85,7 +85,7 @@ public final class ComputeNodeStatusService {
*/
@SuppressWarnings("unchecked")
public Collection<String> loadInstanceLabels(final String instanceId) {
- String yamlContent =
repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ String yamlContent =
repository.getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId));
return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
}
@@ -97,7 +97,7 @@ public final class ComputeNodeStatusService {
*/
@SuppressWarnings("unchecked")
public Collection<String> loadInstanceStatus(final String instanceId) {
- String yamlContent =
repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
+ String yamlContent =
repository.getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
}
@@ -109,7 +109,7 @@ public final class ComputeNodeStatusService {
*/
public Optional<Long> loadInstanceWorkerId(final String instanceId) {
try {
- String workerId =
repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+ String workerId =
repository.getDirectly(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
return Strings.isNullOrEmpty(workerId) ? Optional.empty() :
Optional.of(Long.valueOf(workerId));
} catch (final NumberFormatException ex) {
log.error("Invalid worker id for instance: {}", instanceId);
@@ -133,7 +133,7 @@ public final class ComputeNodeStatusService {
private Collection<ComputeNodeInstance> loadComputeNodeInstances(final
InstanceType instanceType) {
Collection<String> onlineComputeNodes =
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
return onlineComputeNodes.stream().map(each -> loadComputeNodeInstance(
- InstanceMetaDataBuilderFactory.create(each, instanceType,
repository.get(ComputeNode.getOnlineInstanceNodePath(each,
instanceType))))).collect(Collectors.toList());
+ InstanceMetaDataBuilderFactory.create(each, instanceType,
repository.getDirectly(ComputeNode.getOnlineInstanceNodePath(each,
instanceType))))).collect(Collectors.toList());
}
/**
@@ -158,7 +158,7 @@ public final class ComputeNodeStatusService {
Set<Long> result = new LinkedHashSet<>();
List<String> childrenKeys =
repository.getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
for (String each : childrenKeys) {
- String workerId =
repository.get(ComputeNode.getInstanceWorkerIdNodePath(each));
+ String workerId =
repository.getDirectly(ComputeNode.getInstanceWorkerIdNodePath(each));
if (null != workerId) {
result.add(Long.parseLong(workerId));
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index d88774d36ef..1799b1b6b8e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -55,7 +55,7 @@ public final class ComputeNodeStatusSubscriber {
@Subscribe
public void update(final ComputeNodeStatusChangedEvent event) {
String computeStatusNodePath =
ComputeNode.getInstanceStatusNodePath(event.getInstanceId());
- String yamlContext = repository.get(computeStatusNodePath);
+ String yamlContext = repository.getDirectly(computeStatusNodePath);
Collection<String> status = Strings.isNullOrEmpty(yamlContext) ? new
ArrayList<>() : YamlEngine.unmarshal(yamlContext, Collection.class);
if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAK) {
status.add(ComputeNodeStatus.CIRCUIT_BREAK.name());
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
index dce3c8d0d2f..31862d8cd7d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
@@ -45,7 +45,7 @@ public final class StorageNodeStatusService {
Collection<String> storageNodes =
repository.getChildrenKeys(StorageNode.getRootPath());
Map<String, StorageNodeDataSource> result = new
HashMap<>(storageNodes.size(), 1);
storageNodes.forEach(each -> {
- String yamlContext =
repository.get(StorageNode.getStorageNodesDataSourcePath(each));
+ String yamlContext =
repository.getDirectly(StorageNode.getStorageNodesDataSourcePath(each));
if (!Strings.isNullOrEmpty(yamlContext)) {
result.put(each, YamlEngine.unmarshal(yamlContext,
StorageNodeDataSource.class));
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
index dbe5e7f1754..77201bc5168 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
@@ -127,14 +127,14 @@ public final class ProcessRegistrySubscriber {
}
private boolean isReady(final Collection<String> paths) {
- return paths.stream().noneMatch(each -> null != repository.get(each));
+ return paths.stream().noneMatch(each -> null !=
repository.getDirectly(each));
}
private void sendShowProcessList(final String processListId) {
List<String> childrenKeys =
repository.getChildrenKeys(ProcessNode.getProcessListIdPath(processListId));
Collection<String> batchProcessContexts = new LinkedList<>();
for (String each : childrenKeys) {
-
batchProcessContexts.add(repository.get(ProcessNode.getProcessListInstancePath(processListId,
each)));
+
batchProcessContexts.add(repository.getDirectly(ProcessNode.getProcessListInstancePath(processListId,
each)));
}
eventBusContext.post(new
ShowProcessListResponseEvent(batchProcessContexts));
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 56329725aab..5ec8ae5f8f5 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.transaction.Transaction
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executor;
public final class ClusterPersistRepositoryFixture implements
ClusterPersistRepository {
@@ -58,8 +59,18 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void executeInTransaction(final List<TransactionOperation>
transactionOperations) {
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+
+ }
+
@Override
public String get(final String key) {
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
return "";
}
@@ -103,10 +114,6 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
return null;
}
- @Override
- public void watch(final String key, final DataChangedEventListener
listener) {
- }
-
@Override
public boolean tryLock(final String lockKey, final long timeoutMillis) {
return false;
@@ -116,6 +123,11 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void unlock(final String lockKey) {
}
+ @Override
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
+
+ }
+
@Override
public void close() {
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index 1e6bd0d565d..4fc65893a37 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
public final class ProcessListClusterPersistRepositoryFixture implements
ClusterPersistRepository {
@@ -65,8 +66,18 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
public void executeInTransaction(final List<TransactionOperation>
transactionOperations) {
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+
+ }
+
@Override
public String get(final String key) {
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
return REGISTRY_DATA.get(key);
}
@@ -114,10 +125,6 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
return null;
}
- @Override
- public void watch(final String key, final DataChangedEventListener
listener) {
- }
-
@Override
public boolean tryLock(final String lockKey, final long timeoutMillis) {
return false;
@@ -127,6 +134,11 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
public void unlock(final String lockKey) {
}
+ @Override
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
+
+ }
+
@Override
public void close() {
REGISTRY_DATA.clear();
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
index 8e27619dc99..0b529c21805 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
@@ -56,7 +56,7 @@ public final class ProcessRegistrySubscriberTest {
public void assertLoadShowProcessListData() {
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
- when(repository.get(any())).thenReturn(null);
+ when(repository.getDirectly(any())).thenReturn(null);
ShowProcessListRequestEvent showProcessListRequestEvent =
mock(ShowProcessListRequestEvent.class);
processRegistrySubscriber.loadShowProcessListData(showProcessListRequestEvent);
verify(repository, times(1)).persist(any(), any());
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index a44410deb32..39409882475 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -79,7 +79,7 @@ public final class ComputeNodeStatusServiceTest {
InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
final String instanceId = instanceMetaData.getId();
new
ComputeNodeStatusService(repository).loadInstanceLabels(instanceId);
-
verify(repository).get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+
verify(repository).getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId));
}
@Test
@@ -87,7 +87,7 @@ public final class ComputeNodeStatusServiceTest {
InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
final String instanceId = instanceMetaData.getId();
new
ComputeNodeStatusService(repository).loadInstanceStatus(instanceId);
-
verify(repository).get(ComputeNode.getInstanceStatusNodePath(instanceId));
+
verify(repository).getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
}
@Test
@@ -95,14 +95,14 @@ public final class ComputeNodeStatusServiceTest {
InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
final String instanceId = instanceMetaData.getId();
new
ComputeNodeStatusService(repository).loadInstanceWorkerId(instanceId);
-
verify(repository).get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+
verify(repository).getDirectly(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
}
@Test
public void assertLoadAllComputeNodeInstances() {
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
-
when(repository.get("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn("127.0.0.1@3308");
+
when(repository.getDirectly("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn("127.0.0.1@3308");
List<ComputeNodeInstance> actual = new ArrayList<>(new
ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
assertThat(actual.size(), is(2));
assertThat(actual.get(0).getMetaData().getId(),
is("foo_instance_3307"));
diff --git
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index 9a5f899910b..439cfe58fe8 100644
---
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.List;
+import java.util.concurrent.Executor;
/**
* Cluster persist repository.
@@ -97,6 +98,14 @@ public interface ClusterPersistRepository extends
PersistRepository {
*/
void executeInTransaction(List<TransactionOperation>
transactionOperations) throws Exception;
+ /**
+ * Update data in transaction.
+ *
+ * @param key key
+ * @param value value
+ */
+ void updateInTransaction(String key, String value);
+
/**
* Persist ephemeral data.
*
@@ -134,6 +143,7 @@ public interface ClusterPersistRepository extends
PersistRepository {
*
* @param key key of data
* @param listener data changed event listener
+ * @param executor event notify executor
*/
- void watch(String key, DataChangedEventListener listener);
+ void watch(String key, DataChangedEventListener listener, Executor
executor);
}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index a29290703e4..99e4e94b3bb 100644
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -94,8 +95,19 @@ public class ConsulRepository implements
ClusterPersistRepository {
// TODO
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+ // TODO
+ }
+
@Override
public String get(final String key) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
return null == response ? null : response.getValue().getValue();
}
@@ -175,7 +187,7 @@ public class ConsulRepository implements
ClusterPersistRepository {
}
@Override
- public void watch(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
Thread watchThread = new Thread(() -> watchChildKeyChangeEvent(key,
listener));
watchThread.setDaemon(true);
watchThread.start();
diff --git
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
index f5121b0e0b2..4861462f513 100644
---
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -110,7 +110,7 @@ public final class ConsulRepositoryTest {
@Test
public void assertGetKey() {
- repository.get("key");
+ repository.getDirectly("key");
verify(client).getKVValue("key");
verify(response).getValue();
}
@@ -154,8 +154,7 @@ public final class ConsulRepositoryTest {
getValue1.setKey(k1);
getValue1.setValue(v1);
when(responseGetValueList.getValue()).thenReturn(Collections.singletonList(getValue1));
- repository.watch(key, event -> {
- });
+ repository.watch(key, event -> { }, null);
client.setKVValue(k1, "value1-1");
verify(client, atLeastOnce()).getKVValues(any(String.class),
any(QueryParams.class));
Thread.sleep(10000L);
@@ -175,8 +174,7 @@ public final class ConsulRepositoryTest {
getValue1.setKey(k1);
getValue1.setValue(v1);
when(responseGetValueList.getValue()).thenReturn(Collections.singletonList(getValue1));
- repository.watch(key, event -> {
- });
+ repository.watch(key, event -> { }, null);
client.deleteKVValue(k2);
verify(client, atLeastOnce()).getKVValues(any(String.class),
any(QueryParams.class));
Thread.sleep(10000L);
diff --git
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 573f1652f6a..7fcfcf7572b 100644
---
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.transaction.Transaction
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.stream.Collectors;
/**
@@ -100,9 +101,20 @@ public final class EtcdRepository implements
ClusterPersistRepository {
// TODO
}
- @SneakyThrows({InterruptedException.class, ExecutionException.class})
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+ // TODO
+ }
+
@Override
public String get(final String key) {
+ // TODO
+ return null;
+ }
+
+ @SneakyThrows({InterruptedException.class, ExecutionException.class})
+ @Override
+ public String getDirectly(final String key) {
List<KeyValue> keyValues =
client.getKVClient().get(ByteSequence.from(key,
StandardCharsets.UTF_8)).get().getKvs();
return keyValues.isEmpty() ? null :
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
}
@@ -184,7 +196,7 @@ public final class EtcdRepository implements
ClusterPersistRepository {
}
@Override
- public void watch(final String key, final DataChangedEventListener
dataChangedEventListener) {
+ public void watch(final String key, final DataChangedEventListener
dataChangedEventListener, final Executor executor) {
Watch.Listener listener = Watch.listener(response -> {
for (WatchEvent each : response.getEvents()) {
Type type = getEventChangedType(each);
diff --git
a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
index 0a37434d93b..1493eeb168f 100644
---
a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
@@ -133,7 +133,7 @@ public final class EtcdRepositoryTest {
@Test
public void assertGetKey() {
- repository.get("key");
+ repository.getDirectly("key");
verify(kv).get(ByteSequence.from("key", StandardCharsets.UTF_8));
verify(getResponse).getKvs();
}
@@ -172,8 +172,7 @@ public final class EtcdRepositoryTest {
listener.onNext(buildWatchResponse(WatchEvent.EventType.PUT));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
- repository.watch("key1", event -> {
- });
+ repository.watch("key1", event -> { }, null);
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
}
@@ -184,8 +183,7 @@ public final class EtcdRepositoryTest {
listener.onNext(buildWatchResponse(WatchEvent.EventType.DELETE));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
- repository.watch("key1", event -> {
- });
+ repository.watch("key1", event -> { }, null);
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
}
@@ -196,8 +194,7 @@ public final class EtcdRepositoryTest {
listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
- repository.watch("key1", event -> {
- });
+ repository.watch("key1", event -> { }, null);
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
}
@@ -223,7 +220,7 @@ public final class EtcdRepositoryTest {
public void assertGetKeyWhenThrowInterruptedException() throws
ExecutionException, InterruptedException {
doThrow(InterruptedException.class).when(getFuture).get();
try {
- repository.get("key");
+ repository.getDirectly("key");
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -235,7 +232,7 @@ public final class EtcdRepositoryTest {
public void assertGetKeyWhenThrowExecutionException() throws
ExecutionException, InterruptedException {
doThrow(ExecutionException.class).when(getFuture).get();
try {
- repository.get("key");
+ repository.getDirectly("key");
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
index 19339b12fca..2dd2e46432a 100644
---
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
+++
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
@@ -53,6 +53,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -106,6 +107,11 @@ public final class NacosRepository implements
ClusterPersistRepository {
// TODO
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+ // TODO
+ }
+
private NamingService createClient(final
ClusterPersistRepositoryConfiguration config) {
Properties props = new Properties();
props.setProperty("serverAddr", config.getServerLists());
@@ -168,7 +174,7 @@ public final class NacosRepository implements
ClusterPersistRepository {
}
@Override
- public void watch(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
try {
for (ServiceMetadata each : serviceController.getAllServices()) {
NamingEventListener eventListener = each.getListener();
@@ -188,6 +194,12 @@ public final class NacosRepository implements
ClusterPersistRepository {
@Override
public String get(final String key) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
try {
for (ServiceMetadata each : serviceController.getAllServices()) {
Optional<Instance> instance = findExistedInstance(key,
each.isEphemeral()).stream().max(Comparator.comparing(NacosMetaDataUtil::getTimestamp));
diff --git
a/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
b/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
index 32abeb2bf11..68b70971128 100644
---
a/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
@@ -100,7 +100,7 @@ public final class NacosRepositoryTest {
}
ServiceMetadata persistentService =
serviceController.getPersistentService();
when(client.getAllInstances(persistentService.getServiceName(),
false)).thenReturn(instances);
- String value = REPOSITORY.get(key);
+ String value = REPOSITORY.getDirectly(key);
assertThat(value, is("value2"));
}
@@ -282,7 +282,7 @@ public final class NacosRepositoryTest {
Event event = new NamingEvent(ephemeralService.getServiceName(),
Collections.singletonList(instance));
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(null,
event))).when(client).subscribe(anyString(), any(EventListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
- REPOSITORY.watch(key, settableFuture::set);
+ REPOSITORY.watch(key, settableFuture::set, null);
DataChangedEvent dataChangedEvent = settableFuture.get();
assertThat(dataChangedEvent.getType(),
is(DataChangedEvent.Type.ADDED));
assertThat(dataChangedEvent.getKey(), is(key));
@@ -308,7 +308,7 @@ public final class NacosRepositoryTest {
Event event = new NamingEvent(persistentService.getServiceName(),
Collections.singletonList(instance));
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance,
event))).when(client).subscribe(anyString(), any(EventListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
- REPOSITORY.watch(key, settableFuture::set);
+ REPOSITORY.watch(key, settableFuture::set, null);
DataChangedEvent dataChangedEvent = settableFuture.get();
assertThat(dataChangedEvent.getType(),
is(DataChangedEvent.Type.UPDATED));
assertThat(dataChangedEvent.getKey(), is(key));
@@ -325,7 +325,7 @@ public final class NacosRepositoryTest {
Event event = new NamingEvent(persistentService.getServiceName(),
Collections.emptyList());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance,
event))).when(client).subscribe(anyString(), any(EventListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
- REPOSITORY.watch(key, settableFuture::set);
+ REPOSITORY.watch(key, settableFuture::set, null);
DataChangedEvent dataChangedEvent = settableFuture.get();
assertThat(dataChangedEvent.getType(),
is(DataChangedEvent.Type.DELETED));
assertThat(dataChangedEvent.getKey(), is(key));
diff --git
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
index d17016881f0..0838922474a 100644
---
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -23,13 +23,15 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
-import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -52,11 +54,14 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Optional;
import java.util.List;
import java.util.Map;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
@@ -64,7 +69,7 @@ import java.util.concurrent.TimeUnit;
*/
public final class CuratorZookeeperRepository implements
ClusterPersistRepository, InstanceContextAware {
- private final Map<String, CuratorCache> caches = new HashMap<>();
+ private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
private final Builder builder = CuratorFrameworkFactory.builder();
@@ -144,18 +149,28 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
@Override
public void addCacheData(final String cachePath) {
- // TODO
+ CuratorCache cache = CuratorCache.build(client, cachePath);
+ try {
+ cache.start();
+ //CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ //CHECKSTYLE:ON
+ CuratorZookeeperExceptionHandler.handleException(ex);
+ }
+ caches.put(cachePath + "/", cache);
}
@Override
public void evictCacheData(final String cachePath) {
- // TODO
+ CuratorCache cache = caches.remove(cachePath + "/");
+ if (null != cache) {
+ cache.close();
+ }
}
@Override
public Object getRawCache(final String cachePath) {
- // TODO
- return null;
+ return caches.get(cachePath + "/");
}
@Override
@@ -164,13 +179,69 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
@Override
- public void executeInTransaction(final List<TransactionOperation>
transactionOperations) {
- // TODO
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+
client.transaction().forOperations(toCuratorOps(transactionOperations));
+ }
+
+ private List<CuratorOp> toCuratorOps(final List<TransactionOperation>
transactionOperations) {
+ List<CuratorOp> result = new ArrayList<>(transactionOperations.size());
+ TransactionOp transactionOp = client.transactionOp();
+ for (TransactionOperation each : transactionOperations) {
+ result.add(toCuratorOp(each, transactionOp));
+ }
+ return result;
+ }
+
+ private CuratorOp toCuratorOp(final TransactionOperation each, final
TransactionOp transactionOp) {
+ try {
+ switch (each.getType()) {
+ case CHECK_EXISTS:
+ return transactionOp.check().forPath(each.getKey());
+ case ADD:
+ return transactionOp.create().forPath(each.getKey(),
each.getValue().getBytes(StandardCharsets.UTF_8));
+ case UPDATE:
+ return transactionOp.setData().forPath(each.getKey(),
each.getValue().getBytes(StandardCharsets.UTF_8));
+ case DELETE:
+ return transactionOp.delete().forPath(each.getKey());
+ default:
+ throw new UnsupportedOperationException(each.toString());
+ }
+ //CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ //CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(ex);
+ }
+ }
+
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+ try {
+ TransactionOp transactionOp = client.transactionOp();
+
client.transaction().forOperations(transactionOp.check().forPath(key),
transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)));
+ //CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ //CHECKSTYLE:ON
+ CuratorZookeeperExceptionHandler.handleException(ex);
+ }
}
@Override
public String get(final String key) {
- return getDirectly(key);
+ CuratorCache cache = findCuratorCache(key);
+ if (null == cache) {
+ return getDirectly(key);
+ }
+ Optional<ChildData> resultInCache = cache.get(key);
+ return resultInCache.map(v -> null == v.getData() ? null : new
String(v.getData(), StandardCharsets.UTF_8)).orElseGet(() -> getDirectly(key));
+ }
+
+ private CuratorCache findCuratorCache(final String key) {
+ for (Map.Entry<String, CuratorCache> entry : caches.entrySet()) {
+ if (key.startsWith(entry.getKey())) {
+ return entry.getValue();
+ }
+ }
+ return null;
}
@Override
@@ -213,7 +284,8 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
}
- private String getDirectly(final String key) {
+ @Override
+ public String getDirectly(final String key) {
try {
return new String(client.getData().forPath(key),
StandardCharsets.UTF_8);
// CHECKSTYLE:OFF
@@ -286,7 +358,7 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
- RegExceptionHandler.handleException(ex);
+ CuratorZookeeperExceptionHandler.handleException(ex);
}
Preconditions.checkState(0L != result, "Cannot get registry center
time.");
return result;
@@ -298,7 +370,7 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
@Override
- public void watch(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
CuratorCache cache = caches.get(key);
if (null == cache) {
cache = CuratorCache.build(client, key);
@@ -312,7 +384,11 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
new
String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8),
changedType));
}
}).build();
- cache.listenable().addListener(curatorCacheListener);
+ if (null != executor) {
+ cache.listenable().addListener(curatorCacheListener, executor);
+ } else {
+ cache.listenable().addListener(curatorCacheListener);
+ }
start(cache);
}
diff --git
a/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
b/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
index a124c017000..8cb76cef7fd 100644
---
a/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
@@ -207,7 +207,7 @@ public final class CuratorZookeeperRepositoryTest {
ChildData data = new ChildData("/test/children_updated/1", null,
"value2".getBytes());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CHANGED,
oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
- REPOSITORY.watch("/test/children_updated/1", settableFuture::set);
+ REPOSITORY.watch("/test/children_updated/1", settableFuture::set,
null);
DataChangedEvent dataChangedEvent = settableFuture.get();
assertThat(dataChangedEvent.getType(), is(Type.UPDATED));
assertThat(dataChangedEvent.getKey(), is("/test/children_updated/1"));
@@ -221,7 +221,7 @@ public final class CuratorZookeeperRepositoryTest {
ChildData data = new ChildData("/test/children_deleted/5", null,
"value5".getBytes());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_DELETED,
oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
- REPOSITORY.watch("/test/children_deleted/5", settableFuture::set);
+ REPOSITORY.watch("/test/children_deleted/5", settableFuture::set,
null);
DataChangedEvent dataChangedEvent = settableFuture.get();
assertThat(dataChangedEvent.getType(), is(Type.DELETED));
assertThat(dataChangedEvent.getKey(), is("/test/children_deleted/5"));
@@ -234,7 +234,7 @@ public final class CuratorZookeeperRepositoryTest {
ChildData data = new ChildData("/test/children_added/4", null,
"value4".getBytes());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CREATED,
null, data))).when(listenable).addListener(any(CuratorCacheListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
- REPOSITORY.watch("/test/children_added/4", settableFuture::set);
+ REPOSITORY.watch("/test/children_added/4", settableFuture::set, null);
DataChangedEvent dataChangedEvent = settableFuture.get();
assertThat(dataChangedEvent.getType(), is(Type.ADDED));
assertThat(dataChangedEvent.getKey(), is("/test/children_added/4"));
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
index 289a05f5709..dd0f61ee974 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
@@ -48,9 +48,9 @@ public final class StandaloneContextManagerBuilderTextTest {
ContextManager actual = new
StandaloneContextManagerBuilder().build(createContextManagerBuilderParameter());
assertNotNull(actual.getMetaDataContexts().getMetaData().getDatabase("foo_db"));
PersistRepository repository =
actual.getMetaDataContexts().getPersistService().getRepository();
- assertNotNull(repository.get(GlobalNode.getGlobalRuleNode()));
-
assertNotNull(repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath("foo_db",
"0")));
-
assertNotNull(repository.get(DatabaseMetaDataNode.getRulePath("foo_db", "0")));
+ assertNotNull(repository.getDirectly(GlobalNode.getGlobalRuleNode()));
+
assertNotNull(repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath("foo_db",
"0")));
+
assertNotNull(repository.getDirectly(DatabaseMetaDataNode.getRulePath("foo_db",
"0")));
}
private ContextManagerBuilderParameter
createContextManagerBuilderParameter() {
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
index e4714de6fba..4e7368507e9 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
@@ -35,6 +35,11 @@ public final class StandalonePersistRepositoryFixture
implements StandalonePersi
@Override
public String get(final String key) {
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
return persistMap.get(key);
}
diff --git
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
index 952bb9b8ffd..94a24b0619f 100644
---
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
+++
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
@@ -76,6 +76,12 @@ public final class JDBCRepository implements
StandalonePersistRepository {
@Override
public String get(final String key) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
try (
Connection connection = hikariDataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(provider.selectByKeySQL())) {
@@ -117,7 +123,7 @@ public final class JDBCRepository implements
StandalonePersistRepository {
@Override
public boolean isExisted(final String key) {
- return !Strings.isNullOrEmpty(get(key));
+ return !Strings.isNullOrEmpty(getDirectly(key));
}
@Override
@@ -133,7 +139,7 @@ public final class JDBCRepository implements
StandalonePersistRepository {
// Create key level directory recursively.
for (int i = 0; i < paths.length - 1; i++) {
String tempKey = tempPrefix + SEPARATOR + paths[i];
- String tempKeyVal = get(tempKey);
+ String tempKeyVal = getDirectly(tempKey);
if (Strings.isNullOrEmpty(tempKeyVal)) {
if (i != 0) {
parent = tempPrefix;
diff --git
a/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
b/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
index af06c4b4d2b..9c6c91805cf 100644
---
a/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
+++
b/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
@@ -96,7 +96,7 @@ public final class JDBCRepositoryTest {
when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(true);
when(mockResultSet.getString(eq("value"))).thenReturn(value);
- String actualResponse = repository.get(key);
+ String actualResponse = repository.getDirectly(key);
verify(mockPreparedStatement).setString(eq(1), eq(key));
assertEquals(value, actualResponse);
}
@@ -104,7 +104,7 @@ public final class JDBCRepositoryTest {
@Test
public void assertGetFailure() throws Exception {
when(mockJdbcConnection.prepareStatement(eq(fixture.selectByKeySQL()))).thenThrow(new
SQLException());
- String actualResponse = repository.get("key");
+ String actualResponse = repository.getDirectly("key");
assertEquals("", actualResponse);
}
diff --git
a/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
b/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
index c7f843d7993..58fcca88405 100644
---
a/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
+++
b/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
@@ -50,9 +50,9 @@ public final class H2JDBCRepositoryTest {
@Test
public void assertPersistAndGet() {
repository.persist("/testPath/test1", "test1_content");
- assertThat(repository.get("/testPath/test1"), is("test1_content"));
+ assertThat(repository.getDirectly("/testPath/test1"),
is("test1_content"));
repository.persist("/testPath/test1", "modify_content");
- assertThat(repository.get("/testPath/test1"), is("modify_content"));
+ assertThat(repository.getDirectly("/testPath/test1"),
is("modify_content"));
}
@Test
@@ -67,6 +67,6 @@ public final class H2JDBCRepositoryTest {
@Test
public void assertDelete() {
repository.delete("/testPath");
- assertThat(repository.get("/testPath"), is(""));
+ assertThat(repository.getDirectly("/testPath"), is(""));
}
}
diff --git
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index ef174b063b9..8c76aa07cdd 100644
---
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
public final class ClusterPersistRepositoryFixture implements
ClusterPersistRepository {
@@ -64,8 +65,18 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void executeInTransaction(final List<TransactionOperation>
transactionOperations) {
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+
+ }
+
@Override
public String get(final String key) {
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
return REGISTRY_DATA.get(key);
}
@@ -112,7 +123,7 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
}
@Override
- public void watch(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
}
@Override
diff --git
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index b15d3fb353e..5e7d6aa3865 100644
---
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
public final class TestClusterPersistRepository implements
ClusterPersistRepository {
@@ -64,8 +65,18 @@ public final class TestClusterPersistRepository implements
ClusterPersistReposit
public void executeInTransaction(final List<TransactionOperation>
transactionOperations) {
}
+ @Override
+ public void updateInTransaction(final String key, final String value) {
+
+ }
+
@Override
public String get(final String key) {
+ return null;
+ }
+
+ @Override
+ public String getDirectly(final String key) {
return registryData.get(key);
}
@@ -111,10 +122,6 @@ public final class TestClusterPersistRepository implements
ClusterPersistReposit
return null;
}
- @Override
- public void watch(final String key, final DataChangedEventListener
listener) {
- }
-
@Override
public boolean tryLock(final String lockKey, final long timeoutMillis) {
return false;
@@ -124,6 +131,11 @@ public final class TestClusterPersistRepository implements
ClusterPersistReposit
public void unlock(final String lockKey) {
}
+ @Override
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
+
+ }
+
@Override
public void close() {
registryData.clear();
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index 63050fda31b..fb469efd48b 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -70,7 +70,7 @@ public class ScalingWatcher extends TestWatcher {
private void addZookeeperData(final String key, final String parentPath,
final ClusterPersistRepository zookeeperRepository, final Map<String, String>
nodeMap) {
String path = String.join("/", parentPath, key);
- String data = zookeeperRepository.get(path);
+ String data = zookeeperRepository.getDirectly(path);
nodeMap.put(path, data);
List<String> childrenKeys = zookeeperRepository.getChildrenKeys(path);
for (String each : childrenKeys) {