This is an automated email from the ASF dual-hosted git repository.

soulasuna pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d29fa58bc33 Adjustment governance center watch interface (#21415)
d29fa58bc33 is described below

commit d29fa58bc3324ad3fc9c51d1e87323c4572901fd
Author: zhaojinchao <[email protected]>
AuthorDate: Sun Oct 9 16:31:08 2022 +0800

    Adjustment governance center watch interface (#21415)
    
    * Adjustment governance center watch interface
    
    * Add getDirectly interface
    
    * Fix CI
    
    * Fix unit test
    
    * Fix ci
---
 .../fixture/FixtureClusterPersistRepository.java   |  13 ++-
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  16 ++--
 .../service/MetaDataVersionPersistService.java     |   6 +-
 .../config/database/DataSourcePersistService.java  |   8 +-
 .../database/DatabaseRulePersistService.java       |  10 +-
 .../config/global/GlobalRulePersistService.java    |   4 +-
 .../config/global/PropertiesPersistService.java    |   4 +-
 .../schema/TableMetaDataPersistService.java        |   2 +-
 .../service/schema/ViewMetaDataPersistService.java |   2 +-
 .../mode/persist/PersistRepository.java            |   8 ++
 .../DatabaseMetaDataPersistServiceTest.java        |   2 +-
 .../service/MetaDataVersionPersistServiceTest.java |   2 +-
 .../database/DataSourcePersistServiceTest.java     |   8 +-
 .../database/DatabaseRulePersistServiceTest.java   |   8 +-
 .../global/GlobalRulePersistServiceTest.java       |   4 +-
 .../global/PropertiesPersistServiceTest.java       |   2 +-
 .../schema/TableMetaDataPersistServiceTest.java    |   2 +-
 .../schema/ViewMetaDataPersistServiceTest.java     |   2 +-
 .../registry/GovernanceWatcherFactory.java         |   2 +-
 .../compute/service/ComputeNodeStatusService.java  |  10 +-
 .../subscriber/ComputeNodeStatusSubscriber.java    |   2 +-
 .../storage/service/StorageNodeStatusService.java  |   2 +-
 .../subscriber/ProcessRegistrySubscriber.java      |   4 +-
 .../fixture/ClusterPersistRepositoryFixture.java   |  20 +++-
 ...ProcessListClusterPersistRepositoryFixture.java |  20 +++-
 .../subscriber/ProcessRegistrySubscriberTest.java  |   2 +-
 .../service/ComputeNodeStatusServiceTest.java      |   8 +-
 .../cluster/ClusterPersistRepository.java          |  12 ++-
 .../cluster/consul/ConsulRepository.java           |  14 ++-
 .../cluster/consul/ConsulRepositoryTest.java       |   8 +-
 .../repository/cluster/etcd/EtcdRepository.java    |  16 +++-
 .../cluster/etcd/EtcdRepositoryTest.java           |  15 ++-
 .../repository/cluster/nacos/NacosRepository.java  |  14 ++-
 .../cluster/nacos/NacosRepositoryTest.java         |   8 +-
 .../zookeeper/CuratorZookeeperRepository.java      | 106 ++++++++++++++++++---
 .../zookeeper/CuratorZookeeperRepositoryTest.java  |   6 +-
 .../StandaloneContextManagerBuilderTextTest.java   |   6 +-
 .../StandalonePersistRepositoryFixture.java        |   5 +
 .../repository/standalone/jdbc/JDBCRepository.java |  10 +-
 .../standalone/jdbc/JDBCRepositoryTest.java        |   4 +-
 .../standalone/h2/H2JDBCRepositoryTest.java        |   6 +-
 .../fixture/ClusterPersistRepositoryFixture.java   |  13 ++-
 .../fixture/TestClusterPersistRepository.java      |  20 +++-
 .../pipeline/framework/watcher/ScalingWatcher.java |   2 +-
 44 files changed, 316 insertions(+), 122 deletions(-)

diff --git 
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
 
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index d2d0c720541..268a4fc4510 100644
--- 
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++ 
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 public final class FixtureClusterPersistRepository implements 
ClusterPersistRepository {
     
@@ -64,8 +65,18 @@ public final class FixtureClusterPersistRepository 
implements ClusterPersistRepo
     public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) {
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+    
+    }
+    
     @Override
     public String get(final String key) {
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         return registryData.get(key);
     }
     
@@ -112,7 +123,7 @@ public final class FixtureClusterPersistRepository 
implements ClusterPersistRepo
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index f7bad40544e..86ce2c90139 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -51,7 +51,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public boolean isExisted(final String key) {
-        return null != repository.get(key);
+        return null != repository.getDirectly(key);
     }
     
     @Override
@@ -61,12 +61,12 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public String getJobItemProgress(final String jobId, final int 
shardingItem) {
-        return repository.get(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem));
+        return 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem));
     }
     
     @Override
     public Optional<String> getCheckLatestJobId(final String jobId) {
-        return 
Optional.ofNullable(repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId)));
+        return 
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId)));
     }
     
     @Override
@@ -79,7 +79,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     @Override
     public Map<String, DataConsistencyCheckResult> getCheckJobResult(final 
String jobId, final String checkJobId) {
         Map<String, DataConsistencyCheckResult> result = new HashMap<>();
-        String yamlCheckResultMapText = 
repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+        String yamlCheckResultMapText = 
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(jobId, 
checkJobId));
         if (StringUtils.isBlank(yamlCheckResultMapText)) {
             return Collections.emptyMap();
         }
@@ -130,7 +130,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
-        repository.watch(key, listener);
+        repository.watch(key, listener, null);
     }
     
     @Override
@@ -147,7 +147,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public String getMetaDataDataSources(final JobType jobType) {
-        return 
repository.get(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
+        return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
     }
     
     @Override
@@ -157,7 +157,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public String getMetaDataProcessConfiguration(final JobType jobType) {
-        return 
repository.get(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
+        return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
     }
     
     @Override
@@ -167,7 +167,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        return 
repository.get(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
+        return 
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
     }
     
     @Override
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
index 4a507f7112e..0d87bd3b784 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistService.java
@@ -39,7 +39,7 @@ public final class MetaDataVersionPersistService {
      * @return active database version
      */
     public Optional<String> getActiveVersion(final String databaseName) {
-        return 
Optional.ofNullable(repository.get(DatabaseMetaDataNode.getActiveVersionPath(databaseName)));
+        return 
Optional.ofNullable(repository.getDirectly(DatabaseMetaDataNode.getActiveVersionPath(databaseName)));
     }
     
     /**
@@ -66,9 +66,9 @@ public final class MetaDataVersionPersistService {
             return Optional.empty();
         }
         String newVersion = String.valueOf(new 
AtomicLong(Long.parseLong(activeVersion.get())).incrementAndGet());
-        repository.persist(DatabaseMetaDataNode.getRulePath(databaseName, 
newVersion), repository.get(DatabaseMetaDataNode.getRulePath(databaseName, 
activeVersion.get())));
+        repository.persist(DatabaseMetaDataNode.getRulePath(databaseName, 
newVersion), 
repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName, 
activeVersion.get())));
         repository.persist(
-                DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName, 
newVersion), 
repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName, 
activeVersion.get())));
+                DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName, 
newVersion), 
repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
 activeVersion.get())));
         return Optional.of(newVersion);
     }
     
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
index 1e1a39bc3e6..90cab74797d 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistService.java
@@ -68,13 +68,13 @@ public final class DataSourcePersistService implements 
DatabaseBasedPersistServi
     
     @Override
     public Map<String, DataSourceProperties> load(final String databaseName) {
-        return isExisted(databaseName) ? 
getDataSourceProperties(repository.get(
+        return isExisted(databaseName) ? 
getDataSourceProperties(repository.getDirectly(
                 DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName, 
getDatabaseActiveVersion(databaseName)))) : new LinkedHashMap<>();
     }
     
     @Override
     public Map<String, DataSourceProperties> load(final String databaseName, 
final String version) {
-        String yamlContent = 
repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName, 
version));
+        String yamlContent = 
repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
 version));
         return Strings.isNullOrEmpty(yamlContent) ? new LinkedHashMap<>() : 
getDataSourceProperties(yamlContent);
     }
     
@@ -91,7 +91,7 @@ public final class DataSourcePersistService implements 
DatabaseBasedPersistServi
     
     @Override
     public boolean isExisted(final String databaseName) {
-        return !Strings.isNullOrEmpty(getDatabaseActiveVersion(databaseName)) 
&& 
!Strings.isNullOrEmpty(repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
+        return !Strings.isNullOrEmpty(getDatabaseActiveVersion(databaseName)) 
&& 
!Strings.isNullOrEmpty(repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath(databaseName,
                 getDatabaseActiveVersion(databaseName))));
     }
     
@@ -108,6 +108,6 @@ public final class DataSourcePersistService implements 
DatabaseBasedPersistServi
     }
     
     private String getDatabaseActiveVersion(final String databaseName) {
-        return 
repository.get(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
+        return 
repository.getDirectly(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
     }
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
index fba1c9839b0..53223d7c5d2 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistService.java
@@ -68,7 +68,7 @@ public final class DatabaseRulePersistService implements 
DatabaseBasedPersistSer
     public Collection<RuleConfiguration> load(final String databaseName) {
         return isExisted(databaseName)
                 // TODO process algorithm provided configuration
-                ? new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(DatabaseMetaDataNode.getRulePath(databaseName,
+                ? new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName,
                         getDatabaseActiveVersion(databaseName))), 
Collection.class, true))
                 : new LinkedList<>();
     }
@@ -76,19 +76,19 @@ public final class DatabaseRulePersistService implements 
DatabaseBasedPersistSer
     @SuppressWarnings("unchecked")
     @Override
     public Collection<RuleConfiguration> load(final String databaseName, final 
String version) {
-        String yamlContent = 
repository.get(DatabaseMetaDataNode.getRulePath(databaseName, version));
+        String yamlContent = 
repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName, version));
         return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>()
-                : new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(DatabaseMetaDataNode
+                : new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.getDirectly(DatabaseMetaDataNode
                         .getRulePath(databaseName, 
getDatabaseActiveVersion(databaseName))), Collection.class, true));
     }
     
     @Override
     public boolean isExisted(final String databaseName) {
         return !Strings.isNullOrEmpty(getDatabaseActiveVersion(databaseName))
-                && 
!Strings.isNullOrEmpty(repository.get(DatabaseMetaDataNode.getRulePath(databaseName,
 getDatabaseActiveVersion(databaseName))));
+                && 
!Strings.isNullOrEmpty(repository.getDirectly(DatabaseMetaDataNode.getRulePath(databaseName,
 getDatabaseActiveVersion(databaseName))));
     }
     
     private String getDatabaseActiveVersion(final String databaseName) {
-        return 
repository.get(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
+        return 
repository.getDirectly(DatabaseMetaDataNode.getActiveVersionPath(databaseName));
     }
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
index 9987d18d42b..bf10bd08a62 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistService.java
@@ -55,12 +55,12 @@ public final class GlobalRulePersistService implements 
GlobalPersistService<Coll
     @SuppressWarnings("unchecked")
     public Collection<RuleConfiguration> load() {
         return isExisted()
-                ? new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(GlobalNode.getGlobalRuleNode()),
 Collection.class))
+                ? new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.getDirectly(GlobalNode.getGlobalRuleNode()),
 Collection.class))
                 : Collections.emptyList();
     }
     
     private boolean isExisted() {
-        return 
!Strings.isNullOrEmpty(repository.get(GlobalNode.getGlobalRuleNode()));
+        return 
!Strings.isNullOrEmpty(repository.getDirectly(GlobalNode.getGlobalRuleNode()));
     }
     
     /**
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
index a041bd87550..776e85a9667 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistService.java
@@ -46,11 +46,11 @@ public final class PropertiesPersistService implements 
GlobalPersistService<Prop
     }
     
     private boolean isExisted() {
-        return 
!Strings.isNullOrEmpty(repository.get(GlobalNode.getPropsPath()));
+        return 
!Strings.isNullOrEmpty(repository.getDirectly(GlobalNode.getPropsPath()));
     }
     
     @Override
     public Properties load() {
-        return 
Strings.isNullOrEmpty(repository.get(GlobalNode.getPropsPath())) ? new 
Properties() : YamlEngine.unmarshal(repository.get(GlobalNode.getPropsPath()), 
Properties.class);
+        return 
Strings.isNullOrEmpty(repository.getDirectly(GlobalNode.getPropsPath())) ? new 
Properties() : 
YamlEngine.unmarshal(repository.getDirectly(GlobalNode.getPropsPath()), 
Properties.class);
     }
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
index f1b0a4030ca..d1f5a814cb5 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
@@ -59,7 +59,7 @@ public final class TableMetaDataPersistService implements 
SchemaMetaDataPersistS
     private Map<String, ShardingSphereTable> 
getTableMetaDataByTableNames(final String databaseName, final String 
schemaName, final Collection<String> tableNames) {
         Map<String, ShardingSphereTable> result = new 
LinkedHashMap<>(tableNames.size(), 1);
         tableNames.forEach(each -> {
-            String table = 
repository.get(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, 
schemaName, each));
+            String table = 
repository.getDirectly(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, 
schemaName, each));
             if (!StringUtils.isEmpty(table)) {
                 result.put(each.toLowerCase(), new 
YamlTableSwapper().swapToObject(YamlEngine.unmarshal(table, 
YamlShardingSphereTable.class)));
             }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
index 65adcf86376..d89c8386a71 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
@@ -59,7 +59,7 @@ public final class ViewMetaDataPersistService implements 
SchemaMetaDataPersistSe
     private Map<String, ShardingSphereView> getViewMetaDataByViewNames(final 
String databaseName, final String schemaName, final Collection<String> 
viewNames) {
         Map<String, ShardingSphereView> result = new 
LinkedHashMap<>(viewNames.size(), 1);
         viewNames.forEach(each -> {
-            String view = 
repository.get(DatabaseMetaDataNode.getViewMetaDataPath(databaseName, 
schemaName, each));
+            String view = 
repository.getDirectly(DatabaseMetaDataNode.getViewMetaDataPath(databaseName, 
schemaName, each));
             if (!StringUtils.isEmpty(view)) {
                 result.put(each.toLowerCase(), new 
YamlViewSwapper().swapToObject(YamlEngine.unmarshal(view, 
YamlShardingSphereView.class)));
             }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
index 48c00e5559b..76e31f226cc 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
@@ -41,6 +41,14 @@ public interface PersistRepository extends TypedSPI {
      */
     String get(String key);
     
+    /**
+     * Get value from registry center directly.
+     *
+     * @param key key
+     * @return value
+     */
+    String getDirectly(String key);
+    
     /**
      * Get names of sub-node.
      *
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
index c2a95160e2b..5ff43b6efc7 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
@@ -120,7 +120,7 @@ public final class DatabaseMetaDataPersistServiceTest {
         DatabaseMetaDataPersistService databaseMetaDataPersistService = new 
DatabaseMetaDataPersistService(repository);
         
when(repository.getChildrenKeys("/metadata/foo_db/schemas")).thenReturn(Collections.singletonList("foo_schema"));
         
when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
-        
when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+        
when(repository.getDirectly("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
         Map<String, ShardingSphereSchema> schema = 
databaseMetaDataPersistService.loadSchemas("foo_db");
         assertThat(schema.size(), is(1));
         Map<String, ShardingSphereSchema> empty = 
databaseMetaDataPersistService.loadSchemas("test");
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
index 7908c7b2993..f82dff1cb13 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/MetaDataVersionPersistServiceTest.java
@@ -43,7 +43,7 @@ public final class MetaDataVersionPersistServiceTest {
     @Before
     public void setUp() {
         repository = mock(PersistRepository.class);
-        when(repository.get(contains("foo_db"))).thenReturn("1");
+        when(repository.getDirectly(contains("foo_db"))).thenReturn("1");
         metaDataVersionPersistService = new 
MetaDataVersionPersistService(repository);
     }
     
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
index 93d3f58e1cc..83e3c5b981a 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DataSourcePersistServiceTest.java
@@ -51,8 +51,8 @@ public final class DataSourcePersistServiceTest {
     
     @Test
     public void assertLoad() {
-        
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
-        
when(repository.get("/metadata/foo_db/versions/0/dataSources")).thenReturn(readDataSourceYaml("yaml/persist/data-source.yaml"));
+        
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
+        
when(repository.getDirectly("/metadata/foo_db/versions/0/dataSources")).thenReturn(readDataSourceYaml("yaml/persist/data-source.yaml"));
         Map<String, DataSourceProperties> actual = new 
DataSourcePersistService(repository).load("foo_db");
         assertThat(actual.size(), is(2));
         assertDataSourceProperties(actual.get("ds_0"), 
DataSourcePropertiesCreator.create(createDataSource("ds_0")));
@@ -75,14 +75,14 @@ public final class DataSourcePersistServiceTest {
     
     @Test
     public void assertLoadWithoutPath() {
-        
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
+        
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
         Map<String, DataSourceProperties> actual = new 
DataSourcePersistService(repository).load("foo_db");
         assertTrue(actual.isEmpty());
     }
     
     @Test
     public void assertAppend() {
-        
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
+        
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
         new DataSourcePersistService(repository).append("foo_db", 
Collections.singletonMap("foo_ds", 
DataSourcePropertiesCreator.create(createDataSource("foo_ds"))));
         String expected = 
readDataSourceYaml("yaml/persist/data-source-foo.yaml");
         verify(repository).persist("/metadata/foo_db/versions/0/dataSources", 
expected);
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
index a6708327280..7c554e026d3 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/database/DatabaseRulePersistServiceTest.java
@@ -51,16 +51,16 @@ public final class DatabaseRulePersistServiceTest {
     
     @Test
     public void assertLoadWithExistedNode() {
-        
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
-        
when(repository.get("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
+        
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
+        
when(repository.getDirectly("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
         Collection<RuleConfiguration> actual = new 
DatabaseRulePersistService(repository).load("foo_db");
         assertThat(actual.size(), is(1));
     }
     
     @Test
     public void assertIsExisted() {
-        
when(repository.get("/metadata/foo_db/active_version")).thenReturn("0");
-        
when(repository.get("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
+        
when(repository.getDirectly("/metadata/foo_db/active_version")).thenReturn("0");
+        
when(repository.getDirectly("/metadata/foo_db/versions/0/rules")).thenReturn(readYAML());
         DatabaseRulePersistService databaseRulePersistService = new 
DatabaseRulePersistService(repository);
         assertTrue(databaseRulePersistService.isExisted("foo_db"));
         assertFalse(databaseRulePersistService.isExisted("foo_db_1"));
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
index 29aa0296d0a..79b827ca69b 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/GlobalRulePersistServiceTest.java
@@ -45,14 +45,14 @@ public class GlobalRulePersistServiceTest {
     
     @Test
     public void assertLoad() {
-        when(repository.get("/rules")).thenReturn(readYAML());
+        when(repository.getDirectly("/rules")).thenReturn(readYAML());
         Collection<RuleConfiguration> actual = new 
GlobalRulePersistService(repository).load();
         assertThat(actual.size(), is(1));
     }
     
     @Test
     public void assertLoadUsers() {
-        when(repository.get("/rules")).thenReturn(readYAML());
+        when(repository.getDirectly("/rules")).thenReturn(readYAML());
         Collection<ShardingSphereUser> actual = new 
GlobalRulePersistService(repository).loadUsers();
         assertThat(actual.size(), is(2));
     }
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
index 11faf56a446..8a798335f8a 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/global/PropertiesPersistServiceTest.java
@@ -40,7 +40,7 @@ public final class PropertiesPersistServiceTest {
     
     @Test
     public void assertLoad() {
-        when(repository.get("/props")).thenReturn(PROPS_YAML);
+        when(repository.getDirectly("/props")).thenReturn(PROPS_YAML);
         Properties actual = new PropertiesPersistService(repository).load();
         assertThat(actual.get(ConfigurationPropertyKey.SQL_SHOW.getKey()), 
is(Boolean.FALSE));
     }
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
index c3c976a2e54..1445bb82f88 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
@@ -56,7 +56,7 @@ public final class TableMetaDataPersistServiceTest {
     public void assertLoad() {
         TableMetaDataPersistService tableMetaDataPersistService = new 
TableMetaDataPersistService(repository);
         
when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
-        
when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+        
when(repository.getDirectly("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
         Map<String, ShardingSphereTable> tables = 
tableMetaDataPersistService.load("foo_db", "foo_schema");
         assertThat(tables.size(), is(1));
         assertThat(tables.get("t_order").getIndexes().keySet(), 
is(Collections.singleton("primary")));
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
index dbf60af2eb7..10d85d7e5a1 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/ViewMetaDataPersistServiceTest.java
@@ -58,7 +58,7 @@ public final class ViewMetaDataPersistServiceTest {
     public void assertLoad() {
         ViewMetaDataPersistService viewMetaDataPersistService = new 
ViewMetaDataPersistService(repository);
         
when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/views")).thenReturn(Collections.singletonList("foo_view"));
-        
when(repository.get("/metadata/foo_db/schemas/foo_schema/views/foo_view")).thenReturn(readYAML());
+        
when(repository.getDirectly("/metadata/foo_db/schemas/foo_schema/views/foo_view")).thenReturn(readYAML());
         Map<String, ShardingSphereView> views = 
viewMetaDataPersistService.load("foo_db", "foo_schema");
         assertThat(views.size(), is(1));
         assertThat(views.get("foo_view").getName(), is("foo_view"));
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index 01f5c6f3991..463915a9f16 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -58,6 +58,6 @@ public final class GovernanceWatcherFactory {
             if 
(listener.getWatchingTypes().contains(dataChangedEventListener.getType())) {
                 
listener.createGovernanceEvent(dataChangedEventListener).ifPresent(eventBusContext::post);
             }
-        });
+        }, null);
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index e7178474134..6ad59749599 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -85,7 +85,7 @@ public final class ComputeNodeStatusService {
      */
     @SuppressWarnings("unchecked")
     public Collection<String> loadInstanceLabels(final String instanceId) {
-        String yamlContent = 
repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+        String yamlContent = 
repository.getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId));
         return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : 
YamlEngine.unmarshal(yamlContent, Collection.class);
     }
     
@@ -97,7 +97,7 @@ public final class ComputeNodeStatusService {
      */
     @SuppressWarnings("unchecked")
     public Collection<String> loadInstanceStatus(final String instanceId) {
-        String yamlContent = 
repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
+        String yamlContent = 
repository.getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
         return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : 
YamlEngine.unmarshal(yamlContent, Collection.class);
     }
     
@@ -109,7 +109,7 @@ public final class ComputeNodeStatusService {
      */
     public Optional<Long> loadInstanceWorkerId(final String instanceId) {
         try {
-            String workerId = 
repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+            String workerId = 
repository.getDirectly(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
             return Strings.isNullOrEmpty(workerId) ? Optional.empty() : 
Optional.of(Long.valueOf(workerId));
         } catch (final NumberFormatException ex) {
             log.error("Invalid worker id for instance: {}", instanceId);
@@ -133,7 +133,7 @@ public final class ComputeNodeStatusService {
     private Collection<ComputeNodeInstance> loadComputeNodeInstances(final 
InstanceType instanceType) {
         Collection<String> onlineComputeNodes = 
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
         return onlineComputeNodes.stream().map(each -> loadComputeNodeInstance(
-                InstanceMetaDataBuilderFactory.create(each, instanceType, 
repository.get(ComputeNode.getOnlineInstanceNodePath(each, 
instanceType))))).collect(Collectors.toList());
+                InstanceMetaDataBuilderFactory.create(each, instanceType, 
repository.getDirectly(ComputeNode.getOnlineInstanceNodePath(each, 
instanceType))))).collect(Collectors.toList());
     }
     
     /**
@@ -158,7 +158,7 @@ public final class ComputeNodeStatusService {
         Set<Long> result = new LinkedHashSet<>();
         List<String> childrenKeys = 
repository.getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
         for (String each : childrenKeys) {
-            String workerId = 
repository.get(ComputeNode.getInstanceWorkerIdNodePath(each));
+            String workerId = 
repository.getDirectly(ComputeNode.getInstanceWorkerIdNodePath(each));
             if (null != workerId) {
                 result.add(Long.parseLong(workerId));
             }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index d88774d36ef..1799b1b6b8e 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -55,7 +55,7 @@ public final class ComputeNodeStatusSubscriber {
     @Subscribe
     public void update(final ComputeNodeStatusChangedEvent event) {
         String computeStatusNodePath = 
ComputeNode.getInstanceStatusNodePath(event.getInstanceId());
-        String yamlContext = repository.get(computeStatusNodePath);
+        String yamlContext = repository.getDirectly(computeStatusNodePath);
         Collection<String> status = Strings.isNullOrEmpty(yamlContext) ? new 
ArrayList<>() : YamlEngine.unmarshal(yamlContext, Collection.class);
         if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAK) {
             status.add(ComputeNodeStatus.CIRCUIT_BREAK.name());
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
index dce3c8d0d2f..31862d8cd7d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
@@ -45,7 +45,7 @@ public final class StorageNodeStatusService {
         Collection<String> storageNodes = 
repository.getChildrenKeys(StorageNode.getRootPath());
         Map<String, StorageNodeDataSource> result = new 
HashMap<>(storageNodes.size(), 1);
         storageNodes.forEach(each -> {
-            String yamlContext = 
repository.get(StorageNode.getStorageNodesDataSourcePath(each));
+            String yamlContext = 
repository.getDirectly(StorageNode.getStorageNodesDataSourcePath(each));
             if (!Strings.isNullOrEmpty(yamlContext)) {
                 result.put(each, YamlEngine.unmarshal(yamlContext, 
StorageNodeDataSource.class));
             }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
index dbe5e7f1754..77201bc5168 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
@@ -127,14 +127,14 @@ public final class ProcessRegistrySubscriber {
     }
     
     private boolean isReady(final Collection<String> paths) {
-        return paths.stream().noneMatch(each -> null != repository.get(each));
+        return paths.stream().noneMatch(each -> null != 
repository.getDirectly(each));
     }
     
     private void sendShowProcessList(final String processListId) {
         List<String> childrenKeys = 
repository.getChildrenKeys(ProcessNode.getProcessListIdPath(processListId));
         Collection<String> batchProcessContexts = new LinkedList<>();
         for (String each : childrenKeys) {
-            
batchProcessContexts.add(repository.get(ProcessNode.getProcessListInstancePath(processListId,
 each)));
+            
batchProcessContexts.add(repository.getDirectly(ProcessNode.getProcessListInstancePath(processListId,
 each)));
         }
         eventBusContext.post(new 
ShowProcessListResponseEvent(batchProcessContexts));
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 56329725aab..5ec8ae5f8f5 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.transaction.Transaction
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 public final class ClusterPersistRepositoryFixture implements 
ClusterPersistRepository {
     
@@ -58,8 +59,18 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) {
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+    
+    }
+    
     @Override
     public String get(final String key) {
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         return "";
     }
     
@@ -103,10 +114,6 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
         return null;
     }
     
-    @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
-    }
-    
     @Override
     public boolean tryLock(final String lockKey, final long timeoutMillis) {
         return false;
@@ -116,6 +123,11 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void unlock(final String lockKey) {
     }
     
+    @Override
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
+    
+    }
+    
     @Override
     public void close() {
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index 1e6bd0d565d..4fc65893a37 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 public final class ProcessListClusterPersistRepositoryFixture implements 
ClusterPersistRepository {
     
@@ -65,8 +66,18 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
     public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) {
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+    
+    }
+    
     @Override
     public String get(final String key) {
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         return REGISTRY_DATA.get(key);
     }
     
@@ -114,10 +125,6 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
         return null;
     }
     
-    @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
-    }
-    
     @Override
     public boolean tryLock(final String lockKey, final long timeoutMillis) {
         return false;
@@ -127,6 +134,11 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
     public void unlock(final String lockKey) {
     }
     
+    @Override
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
+    
+    }
+    
     @Override
     public void close() {
         REGISTRY_DATA.clear();
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
index 8e27619dc99..0b529c21805 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
@@ -56,7 +56,7 @@ public final class ProcessRegistrySubscriberTest {
     public void assertLoadShowProcessListData() {
         
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
         
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
-        when(repository.get(any())).thenReturn(null);
+        when(repository.getDirectly(any())).thenReturn(null);
         ShowProcessListRequestEvent showProcessListRequestEvent = 
mock(ShowProcessListRequestEvent.class);
         
processRegistrySubscriber.loadShowProcessListData(showProcessListRequestEvent);
         verify(repository, times(1)).persist(any(), any());
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index a44410deb32..39409882475 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -79,7 +79,7 @@ public final class ComputeNodeStatusServiceTest {
         InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
         final String instanceId = instanceMetaData.getId();
         new 
ComputeNodeStatusService(repository).loadInstanceLabels(instanceId);
-        
verify(repository).get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+        
verify(repository).getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId));
     }
     
     @Test
@@ -87,7 +87,7 @@ public final class ComputeNodeStatusServiceTest {
         InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
         final String instanceId = instanceMetaData.getId();
         new 
ComputeNodeStatusService(repository).loadInstanceStatus(instanceId);
-        
verify(repository).get(ComputeNode.getInstanceStatusNodePath(instanceId));
+        
verify(repository).getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
     }
     
     @Test
@@ -95,14 +95,14 @@ public final class ComputeNodeStatusServiceTest {
         InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
         final String instanceId = instanceMetaData.getId();
         new 
ComputeNodeStatusService(repository).loadInstanceWorkerId(instanceId);
-        
verify(repository).get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+        
verify(repository).getDirectly(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
     }
     
     @Test
     public void assertLoadAllComputeNodeInstances() {
         
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
         
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
-        
when(repository.get("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn("127.0.0.1@3308");
+        
when(repository.getDirectly("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn("127.0.0.1@3308");
         List<ComputeNodeInstance> actual = new ArrayList<>(new 
ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
         assertThat(actual.size(), is(2));
         assertThat(actual.get(0).getMetaData().getId(), 
is("foo_instance_3307"));
diff --git 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index 9a5f899910b..439cfe58fe8 100644
--- 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 
 /**
  * Cluster persist repository.
@@ -97,6 +98,14 @@ public interface ClusterPersistRepository extends 
PersistRepository {
      */
     void executeInTransaction(List<TransactionOperation> 
transactionOperations) throws Exception;
     
+    /**
+     * Update data in transaction.
+     *
+     * @param key key
+     * @param value value
+     */
+    void updateInTransaction(String key, String value);
+    
     /**
      * Persist ephemeral data.
      *
@@ -134,6 +143,7 @@ public interface ClusterPersistRepository extends 
PersistRepository {
      *
      * @param key key of data
      * @param listener data changed event listener
+     * @param executor event notify executor
      */
-    void watch(String key, DataChangedEventListener listener);
+    void watch(String key, DataChangedEventListener listener, Executor 
executor);
 }
diff --git 
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
 
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index a29290703e4..99e4e94b3bb 100644
--- 
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ 
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -94,8 +95,19 @@ public class ConsulRepository implements 
ClusterPersistRepository {
         // TODO
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+        // TODO
+    }
+    
     @Override
     public String get(final String key) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         Response<GetValue> response = consulClient.getKVValue(key);
         return null == response ? null : response.getValue().getValue();
     }
@@ -175,7 +187,7 @@ public class ConsulRepository implements 
ClusterPersistRepository {
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
         Thread watchThread = new Thread(() -> watchChildKeyChangeEvent(key, 
listener));
         watchThread.setDaemon(true);
         watchThread.start();
diff --git 
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
 
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
index f5121b0e0b2..4861462f513 100644
--- 
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++ 
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -110,7 +110,7 @@ public final class ConsulRepositoryTest {
     
     @Test
     public void assertGetKey() {
-        repository.get("key");
+        repository.getDirectly("key");
         verify(client).getKVValue("key");
         verify(response).getValue();
     }
@@ -154,8 +154,7 @@ public final class ConsulRepositoryTest {
         getValue1.setKey(k1);
         getValue1.setValue(v1);
         
when(responseGetValueList.getValue()).thenReturn(Collections.singletonList(getValue1));
-        repository.watch(key, event -> {
-        });
+        repository.watch(key, event -> { }, null);
         client.setKVValue(k1, "value1-1");
         verify(client, atLeastOnce()).getKVValues(any(String.class), 
any(QueryParams.class));
         Thread.sleep(10000L);
@@ -175,8 +174,7 @@ public final class ConsulRepositoryTest {
         getValue1.setKey(k1);
         getValue1.setValue(v1);
         
when(responseGetValueList.getValue()).thenReturn(Collections.singletonList(getValue1));
-        repository.watch(key, event -> {
-        });
+        repository.watch(key, event -> { }, null);
         client.deleteKVValue(k2);
         verify(client, atLeastOnce()).getKVValues(any(String.class), 
any(QueryParams.class));
         Thread.sleep(10000L);
diff --git 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 573f1652f6a..7fcfcf7572b 100644
--- 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -45,6 +45,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.transaction.Transaction
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
 /**
@@ -100,9 +101,20 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
         // TODO
     }
     
-    @SneakyThrows({InterruptedException.class, ExecutionException.class})
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+        // TODO
+    }
+    
     @Override
     public String get(final String key) {
+        // TODO
+        return null;
+    }
+    
+    @SneakyThrows({InterruptedException.class, ExecutionException.class})
+    @Override
+    public String getDirectly(final String key) {
         List<KeyValue> keyValues = 
client.getKVClient().get(ByteSequence.from(key, 
StandardCharsets.UTF_8)).get().getKvs();
         return keyValues.isEmpty() ? null : 
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
     }
@@ -184,7 +196,7 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
dataChangedEventListener) {
+    public void watch(final String key, final DataChangedEventListener 
dataChangedEventListener, final Executor executor) {
         Watch.Listener listener = Watch.listener(response -> {
             for (WatchEvent each : response.getEvents()) {
                 Type type = getEventChangedType(each);
diff --git 
a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
 
b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
index 0a37434d93b..1493eeb168f 100644
--- 
a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
+++ 
b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
@@ -133,7 +133,7 @@ public final class EtcdRepositoryTest {
     
     @Test
     public void assertGetKey() {
-        repository.get("key");
+        repository.getDirectly("key");
         verify(kv).get(ByteSequence.from("key", StandardCharsets.UTF_8));
         verify(getResponse).getKvs();
     }
@@ -172,8 +172,7 @@ public final class EtcdRepositoryTest {
             listener.onNext(buildWatchResponse(WatchEvent.EventType.PUT));
             return mock(Watch.Watcher.class);
         }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), 
any(Watch.Listener.class));
-        repository.watch("key1", event -> {
-        });
+        repository.watch("key1", event -> { }, null);
         verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), 
any(Watch.Listener.class));
     }
     
@@ -184,8 +183,7 @@ public final class EtcdRepositoryTest {
             listener.onNext(buildWatchResponse(WatchEvent.EventType.DELETE));
             return mock(Watch.Watcher.class);
         }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), 
any(Watch.Listener.class));
-        repository.watch("key1", event -> {
-        });
+        repository.watch("key1", event -> { }, null);
         verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), 
any(Watch.Listener.class));
     }
     
@@ -196,8 +194,7 @@ public final class EtcdRepositoryTest {
             
listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED));
             return mock(Watch.Watcher.class);
         }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), 
any(Watch.Listener.class));
-        repository.watch("key1", event -> {
-        });
+        repository.watch("key1", event -> { }, null);
         verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), 
any(Watch.Listener.class));
     }
     
@@ -223,7 +220,7 @@ public final class EtcdRepositoryTest {
     public void assertGetKeyWhenThrowInterruptedException() throws 
ExecutionException, InterruptedException {
         doThrow(InterruptedException.class).when(getFuture).get();
         try {
-            repository.get("key");
+            repository.getDirectly("key");
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -235,7 +232,7 @@ public final class EtcdRepositoryTest {
     public void assertGetKeyWhenThrowExecutionException() throws 
ExecutionException, InterruptedException {
         doThrow(ExecutionException.class).when(getFuture).get();
         try {
-            repository.get("key");
+            repository.getDirectly("key");
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
diff --git 
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
 
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
index 19339b12fca..2dd2e46432a 100644
--- 
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
+++ 
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
@@ -53,6 +53,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -106,6 +107,11 @@ public final class NacosRepository implements 
ClusterPersistRepository {
         // TODO
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+        // TODO
+    }
+    
     private NamingService createClient(final 
ClusterPersistRepositoryConfiguration config) {
         Properties props = new Properties();
         props.setProperty("serverAddr", config.getServerLists());
@@ -168,7 +174,7 @@ public final class NacosRepository implements 
ClusterPersistRepository {
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
         try {
             for (ServiceMetadata each : serviceController.getAllServices()) {
                 NamingEventListener eventListener = each.getListener();
@@ -188,6 +194,12 @@ public final class NacosRepository implements 
ClusterPersistRepository {
     
     @Override
     public String get(final String key) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         try {
             for (ServiceMetadata each : serviceController.getAllServices()) {
                 Optional<Instance> instance = findExistedInstance(key, 
each.isEphemeral()).stream().max(Comparator.comparing(NacosMetaDataUtil::getTimestamp));
diff --git 
a/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
 
b/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
index 32abeb2bf11..68b70971128 100644
--- 
a/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
+++ 
b/mode/type/cluster/repository/provider/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
@@ -100,7 +100,7 @@ public final class NacosRepositoryTest {
         }
         ServiceMetadata persistentService = 
serviceController.getPersistentService();
         when(client.getAllInstances(persistentService.getServiceName(), 
false)).thenReturn(instances);
-        String value = REPOSITORY.get(key);
+        String value = REPOSITORY.getDirectly(key);
         assertThat(value, is("value2"));
     }
     
@@ -282,7 +282,7 @@ public final class NacosRepositoryTest {
         Event event = new NamingEvent(ephemeralService.getServiceName(), 
Collections.singletonList(instance));
         doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(null, 
event))).when(client).subscribe(anyString(), any(EventListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
-        REPOSITORY.watch(key, settableFuture::set);
+        REPOSITORY.watch(key, settableFuture::set, null);
         DataChangedEvent dataChangedEvent = settableFuture.get();
         assertThat(dataChangedEvent.getType(), 
is(DataChangedEvent.Type.ADDED));
         assertThat(dataChangedEvent.getKey(), is(key));
@@ -308,7 +308,7 @@ public final class NacosRepositoryTest {
         Event event = new NamingEvent(persistentService.getServiceName(), 
Collections.singletonList(instance));
         doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance, 
event))).when(client).subscribe(anyString(), any(EventListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
-        REPOSITORY.watch(key, settableFuture::set);
+        REPOSITORY.watch(key, settableFuture::set, null);
         DataChangedEvent dataChangedEvent = settableFuture.get();
         assertThat(dataChangedEvent.getType(), 
is(DataChangedEvent.Type.UPDATED));
         assertThat(dataChangedEvent.getKey(), is(key));
@@ -325,7 +325,7 @@ public final class NacosRepositoryTest {
         Event event = new NamingEvent(persistentService.getServiceName(), 
Collections.emptyList());
         doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance, 
event))).when(client).subscribe(anyString(), any(EventListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
-        REPOSITORY.watch(key, settableFuture::set);
+        REPOSITORY.watch(key, settableFuture::set, null);
         DataChangedEvent dataChangedEvent = settableFuture.get();
         assertThat(dataChangedEvent.getType(), 
is(DataChangedEvent.Type.DELETED));
         assertThat(dataChangedEvent.getKey(), is(key));
diff --git 
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
index d17016881f0..0838922474a 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -23,13 +23,15 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
 import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.CuratorCache;
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
-import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -52,11 +54,14 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Optional;
 import java.util.List;
 import java.util.Map;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -64,7 +69,7 @@ import java.util.concurrent.TimeUnit;
  */
 public final class CuratorZookeeperRepository implements 
ClusterPersistRepository, InstanceContextAware {
     
-    private final Map<String, CuratorCache> caches = new HashMap<>();
+    private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
     
     private final Builder builder = CuratorFrameworkFactory.builder();
     
@@ -144,18 +149,28 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
     
     @Override
     public void addCacheData(final String cachePath) {
-        // TODO
+        CuratorCache cache = CuratorCache.build(client, cachePath);
+        try {
+            cache.start();
+            //CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            //CHECKSTYLE:ON
+            CuratorZookeeperExceptionHandler.handleException(ex);
+        }
+        caches.put(cachePath + "/", cache);
     }
     
     @Override
     public void evictCacheData(final String cachePath) {
-        // TODO
+        CuratorCache cache = caches.remove(cachePath + "/");
+        if (null != cache) {
+            cache.close();
+        }
     }
     
     @Override
     public Object getRawCache(final String cachePath) {
-        // TODO
-        return null;
+        return caches.get(cachePath + "/");
     }
     
     @Override
@@ -164,13 +179,69 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
     }
     
     @Override
-    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) {
-        // TODO
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+        
client.transaction().forOperations(toCuratorOps(transactionOperations));
+    }
+    
+    private List<CuratorOp> toCuratorOps(final List<TransactionOperation> 
transactionOperations) {
+        List<CuratorOp> result = new ArrayList<>(transactionOperations.size());
+        TransactionOp transactionOp = client.transactionOp();
+        for (TransactionOperation each : transactionOperations) {
+            result.add(toCuratorOp(each, transactionOp));
+        }
+        return result;
+    }
+    
+    private CuratorOp toCuratorOp(final TransactionOperation each, final 
TransactionOp transactionOp) {
+        try {
+            switch (each.getType()) {
+                case CHECK_EXISTS:
+                    return transactionOp.check().forPath(each.getKey());
+                case ADD:
+                    return transactionOp.create().forPath(each.getKey(), 
each.getValue().getBytes(StandardCharsets.UTF_8));
+                case UPDATE:
+                    return transactionOp.setData().forPath(each.getKey(), 
each.getValue().getBytes(StandardCharsets.UTF_8));
+                case DELETE:
+                    return transactionOp.delete().forPath(each.getKey());
+                default:
+                    throw new UnsupportedOperationException(each.toString());
+            }
+            //CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            //CHECKSTYLE:ON
+            throw new ClusterPersistRepositoryException(ex);
+        }
+    }
+    
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+        try {
+            TransactionOp transactionOp = client.transactionOp();
+            
client.transaction().forOperations(transactionOp.check().forPath(key), 
transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)));
+            //CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            //CHECKSTYLE:ON
+            CuratorZookeeperExceptionHandler.handleException(ex);
+        }
     }
     
     @Override
     public String get(final String key) {
-        return getDirectly(key);
+        CuratorCache cache = findCuratorCache(key);
+        if (null == cache) {
+            return getDirectly(key);
+        }
+        Optional<ChildData> resultInCache = cache.get(key);
+        return resultInCache.map(v -> null == v.getData() ? null : new 
String(v.getData(), StandardCharsets.UTF_8)).orElseGet(() -> getDirectly(key));
+    }
+    
+    private CuratorCache findCuratorCache(final String key) {
+        for (Map.Entry<String, CuratorCache> entry : caches.entrySet()) {
+            if (key.startsWith(entry.getKey())) {
+                return entry.getValue();
+            }
+        }
+        return null;
     }
     
     @Override
@@ -213,7 +284,8 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
         }
     }
     
-    private String getDirectly(final String key) {
+    @Override
+    public String getDirectly(final String key) {
         try {
             return new String(client.getData().forPath(key), 
StandardCharsets.UTF_8);
             // CHECKSTYLE:OFF
@@ -286,7 +358,7 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
-            RegExceptionHandler.handleException(ex);
+            CuratorZookeeperExceptionHandler.handleException(ex);
         }
         Preconditions.checkState(0L != result, "Cannot get registry center 
time.");
         return result;
@@ -298,7 +370,7 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
         CuratorCache cache = caches.get(key);
         if (null == cache) {
             cache = CuratorCache.build(client, key);
@@ -312,7 +384,11 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
                                 new 
String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8), 
changedType));
                     }
                 }).build();
-        cache.listenable().addListener(curatorCacheListener);
+        if (null != executor) {
+            cache.listenable().addListener(curatorCacheListener, executor);
+        } else {
+            cache.listenable().addListener(curatorCacheListener);
+        }
         start(cache);
     }
     
diff --git 
a/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
 
b/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
index a124c017000..8cb76cef7fd 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
@@ -207,7 +207,7 @@ public final class CuratorZookeeperRepositoryTest {
         ChildData data = new ChildData("/test/children_updated/1", null, 
"value2".getBytes());
         
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CHANGED,
 oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
-        REPOSITORY.watch("/test/children_updated/1", settableFuture::set);
+        REPOSITORY.watch("/test/children_updated/1", settableFuture::set, 
null);
         DataChangedEvent dataChangedEvent = settableFuture.get();
         assertThat(dataChangedEvent.getType(), is(Type.UPDATED));
         assertThat(dataChangedEvent.getKey(), is("/test/children_updated/1"));
@@ -221,7 +221,7 @@ public final class CuratorZookeeperRepositoryTest {
         ChildData data = new ChildData("/test/children_deleted/5", null, 
"value5".getBytes());
         
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_DELETED,
 oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
-        REPOSITORY.watch("/test/children_deleted/5", settableFuture::set);
+        REPOSITORY.watch("/test/children_deleted/5", settableFuture::set, 
null);
         DataChangedEvent dataChangedEvent = settableFuture.get();
         assertThat(dataChangedEvent.getType(), is(Type.DELETED));
         assertThat(dataChangedEvent.getKey(), is("/test/children_deleted/5"));
@@ -234,7 +234,7 @@ public final class CuratorZookeeperRepositoryTest {
         ChildData data = new ChildData("/test/children_added/4", null, 
"value4".getBytes());
         
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CREATED,
 null, data))).when(listenable).addListener(any(CuratorCacheListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
-        REPOSITORY.watch("/test/children_added/4", settableFuture::set);
+        REPOSITORY.watch("/test/children_added/4", settableFuture::set, null);
         DataChangedEvent dataChangedEvent = settableFuture.get();
         assertThat(dataChangedEvent.getType(), is(Type.ADDED));
         assertThat(dataChangedEvent.getKey(), is("/test/children_added/4"));
diff --git 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
index 289a05f5709..dd0f61ee974 100644
--- 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
+++ 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilderTextTest.java
@@ -48,9 +48,9 @@ public final class StandaloneContextManagerBuilderTextTest {
         ContextManager actual = new 
StandaloneContextManagerBuilder().build(createContextManagerBuilderParameter());
         
assertNotNull(actual.getMetaDataContexts().getMetaData().getDatabase("foo_db"));
         PersistRepository repository = 
actual.getMetaDataContexts().getPersistService().getRepository();
-        assertNotNull(repository.get(GlobalNode.getGlobalRuleNode()));
-        
assertNotNull(repository.get(DatabaseMetaDataNode.getMetaDataDataSourcePath("foo_db",
 "0")));
-        
assertNotNull(repository.get(DatabaseMetaDataNode.getRulePath("foo_db", "0")));
+        assertNotNull(repository.getDirectly(GlobalNode.getGlobalRuleNode()));
+        
assertNotNull(repository.getDirectly(DatabaseMetaDataNode.getMetaDataDataSourcePath("foo_db",
 "0")));
+        
assertNotNull(repository.getDirectly(DatabaseMetaDataNode.getRulePath("foo_db", 
"0")));
     }
     
     private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
diff --git 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
index e4714de6fba..4e7368507e9 100644
--- 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
+++ 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
@@ -35,6 +35,11 @@ public final class StandalonePersistRepositoryFixture 
implements StandalonePersi
     
     @Override
     public String get(final String key) {
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         return persistMap.get(key);
     }
     
diff --git 
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
 
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
index 952bb9b8ffd..94a24b0619f 100644
--- 
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
+++ 
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
@@ -76,6 +76,12 @@ public final class JDBCRepository implements 
StandalonePersistRepository {
     
     @Override
     public String get(final String key) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         try (
                 Connection connection = hikariDataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(provider.selectByKeySQL())) {
@@ -117,7 +123,7 @@ public final class JDBCRepository implements 
StandalonePersistRepository {
     
     @Override
     public boolean isExisted(final String key) {
-        return !Strings.isNullOrEmpty(get(key));
+        return !Strings.isNullOrEmpty(getDirectly(key));
     }
     
     @Override
@@ -133,7 +139,7 @@ public final class JDBCRepository implements 
StandalonePersistRepository {
             // Create key level directory recursively.
             for (int i = 0; i < paths.length - 1; i++) {
                 String tempKey = tempPrefix + SEPARATOR + paths[i];
-                String tempKeyVal = get(tempKey);
+                String tempKeyVal = getDirectly(tempKey);
                 if (Strings.isNullOrEmpty(tempKeyVal)) {
                     if (i != 0) {
                         parent = tempPrefix;
diff --git 
a/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
 
b/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
index af06c4b4d2b..9c6c91805cf 100644
--- 
a/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
+++ 
b/mode/type/standalone/repository/provider/jdbc/core/src/test/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepositoryTest.java
@@ -96,7 +96,7 @@ public final class JDBCRepositoryTest {
         when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet);
         when(mockResultSet.next()).thenReturn(true);
         when(mockResultSet.getString(eq("value"))).thenReturn(value);
-        String actualResponse = repository.get(key);
+        String actualResponse = repository.getDirectly(key);
         verify(mockPreparedStatement).setString(eq(1), eq(key));
         assertEquals(value, actualResponse);
     }
@@ -104,7 +104,7 @@ public final class JDBCRepositoryTest {
     @Test
     public void assertGetFailure() throws Exception {
         
when(mockJdbcConnection.prepareStatement(eq(fixture.selectByKeySQL()))).thenThrow(new
 SQLException());
-        String actualResponse = repository.get("key");
+        String actualResponse = repository.getDirectly("key");
         assertEquals("", actualResponse);
     }
     
diff --git 
a/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
 
b/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
index c7f843d7993..58fcca88405 100644
--- 
a/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
+++ 
b/mode/type/standalone/repository/provider/jdbc/h2/src/test/java/org/apache/shardingsphere/mode/repository/standalone/h2/H2JDBCRepositoryTest.java
@@ -50,9 +50,9 @@ public final class H2JDBCRepositoryTest {
     @Test
     public void assertPersistAndGet() {
         repository.persist("/testPath/test1", "test1_content");
-        assertThat(repository.get("/testPath/test1"), is("test1_content"));
+        assertThat(repository.getDirectly("/testPath/test1"), 
is("test1_content"));
         repository.persist("/testPath/test1", "modify_content");
-        assertThat(repository.get("/testPath/test1"), is("modify_content"));
+        assertThat(repository.getDirectly("/testPath/test1"), 
is("modify_content"));
     }
     
     @Test
@@ -67,6 +67,6 @@ public final class H2JDBCRepositoryTest {
     @Test
     public void assertDelete() {
         repository.delete("/testPath");
-        assertThat(repository.get("/testPath"), is(""));
+        assertThat(repository.getDirectly("/testPath"), is(""));
     }
 }
diff --git 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index ef174b063b9..8c76aa07cdd 100644
--- 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 public final class ClusterPersistRepositoryFixture implements 
ClusterPersistRepository {
     
@@ -64,8 +65,18 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) {
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+    
+    }
+    
     @Override
     public String get(final String key) {
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         return REGISTRY_DATA.get(key);
     }
     
@@ -112,7 +123,7 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
     }
     
     @Override
diff --git 
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
 
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index b15d3fb353e..5e7d6aa3865 100644
--- 
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++ 
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 public final class TestClusterPersistRepository implements 
ClusterPersistRepository {
     
@@ -64,8 +65,18 @@ public final class TestClusterPersistRepository implements 
ClusterPersistReposit
     public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) {
     }
     
+    @Override
+    public void updateInTransaction(final String key, final String value) {
+    
+    }
+    
     @Override
     public String get(final String key) {
+        return null;
+    }
+    
+    @Override
+    public String getDirectly(final String key) {
         return registryData.get(key);
     }
     
@@ -111,10 +122,6 @@ public final class TestClusterPersistRepository implements 
ClusterPersistReposit
         return null;
     }
     
-    @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
-    }
-    
     @Override
     public boolean tryLock(final String lockKey, final long timeoutMillis) {
         return false;
@@ -124,6 +131,11 @@ public final class TestClusterPersistRepository implements 
ClusterPersistReposit
     public void unlock(final String lockKey) {
     }
     
+    @Override
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
+    
+    }
+    
     @Override
     public void close() {
         registryData.clear();
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index 63050fda31b..fb469efd48b 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -70,7 +70,7 @@ public class ScalingWatcher extends TestWatcher {
     
     private void addZookeeperData(final String key, final String parentPath, 
final ClusterPersistRepository zookeeperRepository, final Map<String, String> 
nodeMap) {
         String path = String.join("/", parentPath, key);
-        String data = zookeeperRepository.get(path);
+        String data = zookeeperRepository.getDirectly(path);
         nodeMap.put(path, data);
         List<String> childrenKeys = zookeeperRepository.getChildrenKeys(path);
         for (String each : childrenKeys) {


Reply via email to