This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 050b7aa15f8 Add meta data listener assisted node path to dynamic watch 
create or drop database (#31527)
050b7aa15f8 is described below

commit 050b7aa15f8bc70d29475e0b0a5d6c84ef337bb1
Author: zhaojinchao <[email protected]>
AuthorDate: Thu Jun 6 16:36:29 2024 +0800

    Add meta data listener assisted node path to dynamic watch create or drop 
database (#31527)
    
    * Add meta data listener assisted node path to dynamic watch create or drop 
database
    
    * Fix spotless
    
    * Fix bug
    
    * Fix checkstyle
    
    * Fix drop database problem
    
    * Refactor listener type
    
    * Fix sploess
    
    * Refactor persist
    
    * Fix ci
---
 .../mode/path/ListenerAssistedNodePath.java        | 65 +++++++++++++++++++
 .../mode/path/ListenerAssistedNodePathTest.java    | 47 ++++++++++++++
 .../mode/processor/ListenerAssistedProcessor.java} | 41 +++++-------
 .../mode/service/PersistServiceFacade.java         |  4 ++
 .../mode/service/enums/ListenerAssistedEnum.java   | 24 +++++++
 .../persist/ListenerAssistedPersistService.java    | 51 +++++++++++++++
 .../mode/service/pojo/ListenerAssistedPOJO.java    | 39 ++++++++++++
 .../cluster/ClusterContextManagerBuilder.java      | 18 +++++-
 .../MetaDataChangedListener.java}                  | 63 +++++++------------
 .../MetaDataWatchListenerManager.java}             | 47 ++++++--------
 .../CreateDatabaseListenerAssistedProcessor.java   | 45 +++++++++++++
 .../DropDatabaseListenerAssistedProcessor.java     | 45 +++++++++++++
 .../watch/ListenerAssistedChangedWatcher.java}     | 41 ++++++------
 .../coordinator/registry/GovernanceWatcher.java    |  3 +-
 .../registry/GovernanceWatcherFactory.java         |  6 +-
 .../data/ShardingSphereDataChangedWatcher.java     |  2 +-
 .../CreateDatabaseListenerAssistedEvent.java       | 32 ++++++++++
 .../DropDatabaseListenerAssistedEvent.java         | 32 ++++++++++
 .../watcher/ClusterStateChangedWatcher.java        |  2 +-
 .../watcher/ComputeNodeStateChangedWatcher.java    |  2 +-
 .../watcher/QualifiedDataSourceChangedWatcher.java |  2 +-
 .../registry/watcher/GlobalRuleChangedWatcher.java |  2 +-
 .../registry/watcher/PropertiesChangedWatcher.java |  2 +-
 .../subscriber/ClusterEventSubscriberRegistry.java |  1 +
 .../ListenerAssistedMetaDataChangedSubscriber.java | 73 ++++++++++++++++++++++
 .../ResourceMetaDataChangedSubscriber.java         | 22 -------
 .../ClusterMetaDataManagerPersistService.java      |  7 ++-
 ....cluster.coordinator.registry.GovernanceWatcher |  2 +-
 ...phere.mode.processor.ListenerAssistedProcessor} |  9 +--
 .../fixture/ClusterPersistRepositoryFixture.java   |  4 ++
 ...ProcessListClusterPersistRepositoryFixture.java |  4 ++
 .../subscriber/CacheEvictedSubscriberTest.java     |  4 +-
 .../ResourceMetaDataChangedSubscriberTest.java     | 31 ---------
 .../cluster/ClusterPersistRepository.java          |  7 +++
 .../repository/cluster/etcd/EtcdRepository.java    |  5 ++
 .../cluster/zookeeper/ZookeeperRepository.java     | 16 +++++
 .../fixture/ClusterPersistRepositoryFixture.java   |  4 ++
 37 files changed, 611 insertions(+), 193 deletions(-)

diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
new file mode 100644
index 00000000000..b32bdf7af90
--- /dev/null
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.path;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Listener assisted node path.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ListenerAssistedNodePath {
+    
+    private static final String ROOT_NODE = "listener_assisted";
+    
+    /**
+     * Get root node path.
+     *
+     * @return root node path
+     */
+    public static String getRootNodePath() {
+        return String.join("/", "", ROOT_NODE);
+    }
+    
+    /**
+     * Get database name by node path.
+     *
+     * @param nodePath node path
+     * @return database name
+     */
+    public static Optional<String> getDatabaseName(final String nodePath) {
+        Pattern pattern = Pattern.compile(getRootNodePath() + "/(\\w+)$", 
Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(nodePath);
+        return matcher.find() ? Optional.of(matcher.group(1)) : 
Optional.empty();
+    }
+    
+    /**
+     * Get database base name node path.
+     *
+     * @param databaseName database name
+     * @return database name node path
+     */
+    public static String getDatabaseNameNodePath(final String databaseName) {
+        return String.join("/", "", ROOT_NODE, databaseName);
+    }
+}
diff --git 
a/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
 
b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
new file mode 100644
index 00000000000..e21cc563544
--- /dev/null
+++ 
b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.path;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ListenerAssistedNodePathTest {
+    
+    @Test
+    void assertRooPath() {
+        assertThat(ListenerAssistedNodePath.getRootNodePath(), 
is("/listener_assisted"));
+    }
+    
+    @Test
+    void assertGetDatabaseName() {
+        Optional<String> actual = 
ListenerAssistedNodePath.getDatabaseName("/listener_assisted/foo_db");
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), Matchers.is("foo_db"));
+    }
+    
+    @Test
+    void assertGetDatabaseNameNodePath() {
+        assertThat(ListenerAssistedNodePath.getDatabaseNameNodePath("foo_db"), 
is("/listener_assisted/foo_db"));
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/processor/ListenerAssistedProcessor.java
similarity index 53%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
copy to 
mode/core/src/main/java/org/apache/shardingsphere/mode/processor/ListenerAssistedProcessor.java
index f1ce5f0c35c..c45d36d668d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/processor/ListenerAssistedProcessor.java
@@ -15,43 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+package org.apache.shardingsphere.mode.processor;
 
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-
-import java.util.Collection;
-import java.util.Optional;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 
 /**
- * Governance watcher.
- * 
- * @param <T> type of event
+ * Listener assisted processor.
+ *
+ * @param <T> class type of implemented governance event
  */
 @SingletonSPI
-public interface GovernanceWatcher<T> {
+public interface ListenerAssistedProcessor<T extends GovernanceEvent> extends 
TypedSPI {
     
     /**
-     * Get watching keys.
+     * Get listener key.
      *
-     * @param databaseNames database names
-     * @return watching keys
+     * @param event event
+     * @return listener key
      */
-    Collection<String> getWatchingKeys(Collection<String> databaseNames);
+    String getListenerKey(T event);
     
     /**
-     * Get watching types.
+     * Post-process.
      *
-     * @return watching types
-     */
-    Collection<Type> getWatchingTypes();
-    
-    /**
-     * Create governance event.
-     * 
-     * @param event registry center data changed event
-     * @return governance event
+     * @param contextManager context manager
+     * @param event event
      */
-    Optional<T> createGovernanceEvent(DataChangedEvent event);
+    void processor(ContextManager contextManager, T event);
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
index 17ba7def416..a6af914a4d8 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistServ
 import org.apache.shardingsphere.mode.service.persist.PersistServiceBuilder;
 import org.apache.shardingsphere.mode.service.persist.ProcessPersistService;
 import 
org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
+import 
org.apache.shardingsphere.mode.service.persist.ListenerAssistedPersistService;
 import 
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 import org.apache.shardingsphere.mode.state.StatePersistService;
@@ -47,6 +48,8 @@ public final class PersistServiceFacade {
     
     private final ProcessPersistService processPersistService;
     
+    private final ListenerAssistedPersistService 
listenerAssistedPersistService;
+    
     private final QualifiedDataSourceStatePersistService 
qualifiedDataSourceStatePersistService;
     
     public PersistServiceFacade(final PersistRepository repository, final 
ModeConfiguration modeConfiguration, final ContextManager contextManager) {
@@ -57,6 +60,7 @@ public final class PersistServiceFacade {
         PersistServiceBuilder persistServiceBuilder = 
TypedSPILoader.getService(PersistServiceBuilder.class, 
modeConfiguration.getType());
         metaDataManagerPersistService = 
persistServiceBuilder.buildMetaDataManagerPersistService(contextManager);
         processPersistService = 
persistServiceBuilder.buildProcessPersistService(repository);
+        listenerAssistedPersistService = new 
ListenerAssistedPersistService(repository);
     }
     
     /**
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/enums/ListenerAssistedEnum.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/enums/ListenerAssistedEnum.java
new file mode 100644
index 00000000000..26e79a520b3
--- /dev/null
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/enums/ListenerAssistedEnum.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.service.enums;
+
+public enum ListenerAssistedEnum {
+    CREATE_DATABASE,
+    
+    DROP_DATABASE
+}
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/persist/ListenerAssistedPersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/persist/ListenerAssistedPersistService.java
new file mode 100644
index 00000000000..1919d59c622
--- /dev/null
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/persist/ListenerAssistedPersistService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.service.persist;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath;
+import org.apache.shardingsphere.mode.service.pojo.ListenerAssistedPOJO;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
+
+/**
+ * Listener assisted persist service.
+ */
+@RequiredArgsConstructor
+public final class ListenerAssistedPersistService {
+    
+    private final PersistRepository repository;
+    
+    /**
+     * Persist database name listener assisted.
+     *
+     * @param listenerAssistedPOJO listener assisted pojo
+     */
+    public void persistDatabaseNameListenerAssisted(final ListenerAssistedPOJO 
listenerAssistedPOJO) {
+        
repository.persist(ListenerAssistedNodePath.getDatabaseNameNodePath(listenerAssistedPOJO.getDatabaseName()),
 YamlEngine.marshal(listenerAssistedPOJO));
+    }
+    
+    /**
+     * Delete database name listener assisted.
+     *
+     * @param databaseName database name
+     */
+    public void deleteDatabaseNameListenerAssisted(final String databaseName) {
+        
repository.delete(ListenerAssistedNodePath.getDatabaseNameNodePath(databaseName));
+    }
+}
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ListenerAssistedPOJO.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ListenerAssistedPOJO.java
new file mode 100644
index 00000000000..935a2721de6
--- /dev/null
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ListenerAssistedPOJO.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.service.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import org.apache.shardingsphere.mode.service.enums.ListenerAssistedEnum;
+
+/**
+ * Listener assisted pojo.
+ */
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+public final class ListenerAssistedPOJO implements YamlConfiguration {
+    
+    private String databaseName;
+    
+    private ListenerAssistedEnum listenerAssistedEnum;
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 5ce99e6dd3a..7d7de23b622 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,12 +26,15 @@ import 
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaDa
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataWatchListenerManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
@@ -43,7 +46,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import 
org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
 
 import java.sql.SQLException;
-import java.util.Collections;
+import java.util.Collection;
 
 /**
  * Cluster context manager builder.
@@ -86,8 +89,8 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     private void registerOnline(final EventBusContext eventBusContext, final 
ComputeNodeInstanceContext computeNodeInstanceContext,
                                 final ClusterPersistRepository repository, 
final ContextManagerBuilderParameter param, final ContextManager 
contextManager) {
         
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
-        new GovernanceWatcherFactory(repository,
-                eventBusContext, param.getInstanceMetaData() instanceof 
JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : 
Collections.emptyList()).watchListeners();
+        new GovernanceWatcherFactory(repository, 
eventBusContext).watchListeners();
+        watchDatabaseMetaDataListener(param, 
contextManager.getPersistServiceFacade().getMetaDataPersistService(), 
eventBusContext);
         if (null != param.getLabels()) {
             
contextManager.getComputeNodeInstanceContext().getInstance().getLabels().addAll(param.getLabels());
         }
@@ -95,6 +98,15 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         new ClusterEventSubscriberRegistry(contextManager, 
repository).register();
     }
     
+    private void watchDatabaseMetaDataListener(final 
ContextManagerBuilderParameter param, final MetaDataPersistService 
metaDataPersistService, final EventBusContext eventBusContext) {
+        getDatabaseNames(param, metaDataPersistService).forEach(each -> new 
MetaDataWatchListenerManager((ClusterPersistRepository) 
metaDataPersistService.getRepository())
+                .addListener(DatabaseMetaDataNode.getDatabaseNamePath(each), 
new MetaDataChangedListener(eventBusContext)));
+    }
+    
+    private Collection<String> getDatabaseNames(final 
ContextManagerBuilderParameter param, final MetaDataPersistService 
metaDataPersistService) {
+        return param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? 
param.getDatabaseConfigs().keySet() : 
metaDataPersistService.getDatabaseMetaDataService().loadAllDatabaseNames();
+    }
+    
     @Override
     public String getType() {
         return "Cluster";
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
similarity index 81%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
index 0bca081f199..6cbc251e5ac 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener;
 
 import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import 
org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode;
 import 
org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode;
@@ -35,57 +37,40 @@ import 
org.apache.shardingsphere.mode.event.schema.table.CreateOrAlterTableEvent
 import org.apache.shardingsphere.mode.event.schema.table.DropTableEvent;
 import org.apache.shardingsphere.mode.event.schema.view.CreateOrAlterViewEvent;
 import org.apache.shardingsphere.mode.event.schema.view.DropViewEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import 
org.apache.shardingsphere.mode.metadata.builder.RuleConfigurationEventBuilder;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
- * Meta data changed watcher.
+ * Meta data changed listener.
  */
-public final class MetaDataChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
+@RequiredArgsConstructor
+public final class MetaDataChangedListener implements DataChangedEventListener 
{
     
-    private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder 
= new RuleConfigurationEventBuilder();
+    private final EventBusContext eventBusContext;
     
-    @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
-        return databaseNames.isEmpty()
-                ? Collections.singleton(DatabaseMetaDataNode.getMetaDataNode())
-                : 
databaseNames.stream().map(DatabaseMetaDataNode::getDatabaseNamePath).collect(Collectors.toList());
-    }
+    private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder 
= new RuleConfigurationEventBuilder();
     
     @Override
-    public Collection<Type> getWatchingTypes() {
-        return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+    public void onChange(final DataChangedEvent event) {
+        createGovernanceEvent(event).ifPresent(eventBusContext::post);
     }
     
-    @Override
-    public Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
+    private Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
         String key = event.getKey();
-        Optional<String> databaseName = 
DatabaseMetaDataNode.getDatabaseName(key);
-        if (databaseName.isPresent()) {
-            return createDatabaseChangedEvent(databaseName.get(), event);
-        }
-        databaseName = DatabaseMetaDataNode.getDatabaseNameBySchemaNode(key);
+        Optional<String> databaseName = 
DatabaseMetaDataNode.getDatabaseNameBySchemaNode(key);
         Optional<String> schemaName = DatabaseMetaDataNode.getSchemaName(key);
         if (databaseName.isPresent() && schemaName.isPresent()) {
             return createSchemaChangedEvent(databaseName.get(), 
schemaName.get(), event);
         }
         schemaName = DatabaseMetaDataNode.getSchemaNameByTableNode(key);
-        if (databaseName.isPresent() && schemaName.isPresent() && 
(TableMetaDataNode.isTableActiveVersionNode(event.getKey()))
-                || TableMetaDataNode.isTableNode(event.getKey())) {
+        if (databaseName.isPresent() && schemaName.isPresent() && 
tableMetaDataChanged(event.getKey())) {
             return createTableChangedEvent(databaseName.get(), 
schemaName.get(), event);
         }
-        if (databaseName.isPresent() && schemaName.isPresent() && 
(ViewMetaDataNode.isViewActiveVersionNode(event.getKey())
-                || ViewMetaDataNode.isViewNode(event.getKey()))) {
+        if (databaseName.isPresent() && schemaName.isPresent() && 
viewMetaDataChanged(event.getKey())) {
             return createViewChangedEvent(databaseName.get(), 
schemaName.get(), event);
         }
         if (!databaseName.isPresent()) {
@@ -97,16 +82,6 @@ public final class MetaDataChangedWatcher implements 
GovernanceWatcher<Governanc
         return ruleConfigurationEventBuilder.build(databaseName.get(), event);
     }
     
-    private Optional<GovernanceEvent> createDatabaseChangedEvent(final String 
databaseName, final DataChangedEvent event) {
-        if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
-            return Optional.of(new DatabaseAddedEvent(databaseName));
-        }
-        if (Type.DELETED == event.getType()) {
-            return Optional.of(new DatabaseDeletedEvent(databaseName));
-        }
-        return Optional.empty();
-    }
-    
     private Optional<GovernanceEvent> createSchemaChangedEvent(final String 
databaseName, final String schemaName, final DataChangedEvent event) {
         if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
             return Optional.of(new SchemaAddedEvent(databaseName, schemaName));
@@ -117,6 +92,14 @@ public final class MetaDataChangedWatcher implements 
GovernanceWatcher<Governanc
         return Optional.empty();
     }
     
+    private boolean tableMetaDataChanged(final String key) {
+        return TableMetaDataNode.isTableActiveVersionNode(key) || 
TableMetaDataNode.isTableNode(key);
+    }
+    
+    private boolean viewMetaDataChanged(final String key) {
+        return ViewMetaDataNode.isViewActiveVersionNode(key) || 
ViewMetaDataNode.isViewNode(key);
+    }
+    
     private Optional<GovernanceEvent> createTableChangedEvent(final String 
databaseName, final String schemaName, final DataChangedEvent event) {
         if (Type.DELETED == event.getType() && 
TableMetaDataNode.isTableNode(event.getKey())) {
             Optional<String> tableName = 
TableMetaDataNode.getTableName(event.getKey());
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataWatchListenerManager.java
similarity index 51%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
copy to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataWatchListenerManager.java
index f1ce5f0c35c..b08299ef293 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataWatchListenerManager.java
@@ -15,43 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener;
 
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-
-import java.util.Collection;
-import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 /**
- * Governance watcher.
- * 
- * @param <T> type of event
+ * Meta data watch listener manager.
  */
-@SingletonSPI
-public interface GovernanceWatcher<T> {
+@RequiredArgsConstructor
+public final class MetaDataWatchListenerManager {
     
-    /**
-     * Get watching keys.
-     *
-     * @param databaseNames database names
-     * @return watching keys
-     */
-    Collection<String> getWatchingKeys(Collection<String> databaseNames);
+    private final ClusterPersistRepository repository;
     
     /**
-     * Get watching types.
+     * Add listener.
      *
-     * @return watching types
+     * @param listenerKey listener key
+     * @param dataChangedEventListener data changed event listener
      */
-    Collection<Type> getWatchingTypes();
+    public void addListener(final String listenerKey, final 
DataChangedEventListener dataChangedEventListener) {
+        repository.watch(listenerKey, dataChangedEventListener);
+    }
     
     /**
-     * Create governance event.
-     * 
-     * @param event registry center data changed event
-     * @return governance event
+     * Remove listener.
+     *
+     * @param listenerKey listener key
      */
-    Optional<T> createGovernanceEvent(DataChangedEvent event);
+    public void removeListener(final String listenerKey) {
+        repository.removeDataListener(listenerKey);
+    }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/CreateDatabaseListenerAssistedProcessor.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/CreateDatabaseListenerAssistedProcessor.java
new file mode 100644
index 00000000000..731dc0980ed
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/CreateDatabaseListenerAssistedProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor;
+
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.CreateDatabaseListenerAssistedEvent;
+
+/**
+ * Create database listener assisted processor.
+ */
+public final class CreateDatabaseListenerAssistedProcessor implements 
ListenerAssistedProcessor<CreateDatabaseListenerAssistedEvent> {
+    
+    @Override
+    public String getListenerKey(final CreateDatabaseListenerAssistedEvent 
event) {
+        return 
DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName());
+    }
+    
+    @Override
+    public void processor(final ContextManager contextManager, final 
CreateDatabaseListenerAssistedEvent event) {
+        
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addDatabase(event.getDatabaseName());
+        
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+    }
+    
+    @Override
+    public String getType() {
+        return CreateDatabaseListenerAssistedEvent.class.getName();
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/DropDatabaseListenerAssistedProcessor.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/DropDatabaseListenerAssistedProcessor.java
new file mode 100644
index 00000000000..37d65b91ba8
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/DropDatabaseListenerAssistedProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor;
+
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor;
+
+/**
+ * Drop database listener assisted processor.
+ */
+public final class DropDatabaseListenerAssistedProcessor implements 
ListenerAssistedProcessor<DropDatabaseListenerAssistedEvent> {
+    
+    @Override
+    public String getListenerKey(final DropDatabaseListenerAssistedEvent 
event) {
+        return 
DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName());
+    }
+    
+    @Override
+    public void processor(final ContextManager contextManager, final 
DropDatabaseListenerAssistedEvent event) {
+        
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropDatabase(event.getDatabaseName());
+        
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+    }
+    
+    @Override
+    public String getType() {
+        return DropDatabaseListenerAssistedEvent.class.getName();
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
similarity index 51%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
copy to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
index 05cd9c2198f..cd6b90bcca8 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
@@ -15,51 +15,50 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch;
 
-import com.google.common.base.Strings;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
-import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
-import 
org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatus;
-import 
org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatusSwapper;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.CreateDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.service.enums.ListenerAssistedEnum;
+import org.apache.shardingsphere.mode.service.pojo.ListenerAssistedPOJO;
+import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 
 /**
- * Qualified data source changed watcher.
+ * Listener assisted changed watcher.
  */
-public final class QualifiedDataSourceChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
+public class ListenerAssistedChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
-        return Collections.singleton(QualifiedDataSourceNode.getRootPath());
+    public Collection<String> getWatchingKeys() {
+        return 
Collections.singleton(ListenerAssistedNodePath.getRootNodePath());
     }
     
     @Override
-    public Collection<Type> getWatchingTypes() {
-        return Arrays.asList(Type.ADDED, Type.UPDATED);
+    public Collection<DataChangedEvent.Type> getWatchingTypes() {
+        return Collections.singletonList(Type.ADDED);
     }
     
     @Override
     public Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
-        if (Strings.isNullOrEmpty(event.getValue())) {
+        Optional<String> databaseName = 
ListenerAssistedNodePath.getDatabaseName(event.getKey());
+        if (!databaseName.isPresent()) {
             return Optional.empty();
         }
-        Optional<QualifiedDataSource> qualifiedDataSource = 
QualifiedDataSourceNode.extractQualifiedDataSource(event.getKey());
-        if (qualifiedDataSource.isPresent()) {
-            QualifiedDataSourceState status = new 
YamlQualifiedDataSourceStatusSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
 YamlQualifiedDataSourceStatus.class));
-            return Optional.of(new 
StorageNodeChangedEvent(qualifiedDataSource.get(), status));
+        ListenerAssistedPOJO data = YamlEngine.unmarshal(event.getValue(), 
ListenerAssistedPOJO.class);
+        if (ListenerAssistedEnum.CREATE_DATABASE == 
data.getListenerAssistedEnum()) {
+            return Optional.of(new 
CreateDatabaseListenerAssistedEvent(databaseName.get()));
         }
-        return Optional.empty();
+        return ListenerAssistedEnum.DROP_DATABASE == 
data.getListenerAssistedEnum()
+                ? Optional.of(new 
DropDatabaseListenerAssistedEvent(databaseName.get()))
+                : Optional.empty();
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
index f1ce5f0c35c..efb25e49d86 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
@@ -35,10 +35,9 @@ public interface GovernanceWatcher<T> {
     /**
      * Get watching keys.
      *
-     * @param databaseNames database names
      * @return watching keys
      */
-    Collection<String> getWatchingKeys(Collection<String> databaseNames);
+    Collection<String> getWatchingKeys();
     
     /**
      * Get watching types.
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index e12299b7afd..9e84cfa7b74 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -22,8 +22,6 @@ import 
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
-import java.util.Collection;
-
 /**
  * Governance watcher factory.
  */
@@ -34,8 +32,6 @@ public final class GovernanceWatcherFactory {
     
     private final EventBusContext eventBusContext;
     
-    private final Collection<String> databaseNames;
-    
     /**
      * Watch listeners.
      */
@@ -46,7 +42,7 @@ public final class GovernanceWatcherFactory {
     }
     
     private void watch(final GovernanceWatcher<?> listener) {
-        for (String each : listener.getWatchingKeys(databaseNames)) {
+        for (String each : listener.getWatchingKeys()) {
             watch(each, listener);
         }
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index b45709733f4..69566a166fe 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -45,7 +45,7 @@ import java.util.Optional;
 public final class ShardingSphereDataChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
+    public Collection<String> getWatchingKeys() {
         return 
Collections.singleton(ShardingSphereDataNode.getShardingSphereDataNodePath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/CreateDatabaseListenerAssistedEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/CreateDatabaseListenerAssistedEvent.java
new file mode 100644
index 00000000000..05094eab177
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/CreateDatabaseListenerAssistedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+
+/**
+ * Create database listener assisted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CreateDatabaseListenerAssistedEvent implements 
GovernanceEvent {
+    
+    private final String databaseName;
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/DropDatabaseListenerAssistedEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/DropDatabaseListenerAssistedEvent.java
new file mode 100644
index 00000000000..e6d8bd94518
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/DropDatabaseListenerAssistedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+
+/**
+ * Drop database listener assisted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DropDatabaseListenerAssistedEvent implements 
GovernanceEvent {
+    
+    private final String databaseName;
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
index 716056ccbb3..66fb9eabd07 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
@@ -37,7 +37,7 @@ import java.util.Optional;
 public final class ClusterStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(ComputeNode.getClusterStateNodePath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 0e5f0d56a28..143d148e874 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -54,7 +54,7 @@ import java.util.regex.Pattern;
 public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(ComputeNode.getComputeNodePath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
index 05cd9c2198f..aeb4e05fd54 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
@@ -41,7 +41,7 @@ import java.util.Optional;
 public final class QualifiedDataSourceChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(QualifiedDataSourceNode.getRootPath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
index a101c50bc81..2784d8e4759 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 public final class GlobalRuleChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(GlobalNode.getGlobalRuleRootNode());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
index 816d947427b..c83449c0b8c 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
@@ -35,7 +35,7 @@ import java.util.Optional;
 public final class PropertiesChangedWatcher implements 
GovernanceWatcher<AlterPropertiesEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final Collection<String> 
databaseNames) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(GlobalNode.getPropsRootNode());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
index 5326fe989c4..ae15b0a2214 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
@@ -34,6 +34,7 @@ public final class ClusterEventSubscriberRegistry extends 
EventSubscriberRegistr
                 new ConfigurationChangedSubscriber(contextManager),
                 new ConfigurationChangedSubscriber(contextManager),
                 new ResourceMetaDataChangedSubscriber(contextManager),
+                new ListenerAssistedMetaDataChangedSubscriber(contextManager),
                 new StateChangedSubscriber(contextManager),
                 new DatabaseChangedSubscriber(contextManager),
                 new ProcessListChangedSubscriber(contextManager, repository),
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedMetaDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedMetaDataChangedSubscriber.java
new file mode 100644
index 00000000000..385edccde34
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedMetaDataChangedSubscriber.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataWatchListenerManager;
+import org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.CreateDatabaseListenerAssistedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Optional;
+
+/**
+ * Listener assisted meta data changed subscriber.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@RequiredArgsConstructor
+public final class ListenerAssistedMetaDataChangedSubscriber implements 
EventSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    /**
+     * Renew to persist meta data.
+     *
+     * @param event database added event
+     */
+    @Subscribe
+    public synchronized void renew(final CreateDatabaseListenerAssistedEvent 
event) {
+        Optional<ListenerAssistedProcessor> processor = 
TypedSPILoader.findService(ListenerAssistedProcessor.class, 
event.getClass().getName());
+        if (!processor.isPresent()) {
+            return;
+        }
+        new MetaDataWatchListenerManager((ClusterPersistRepository) 
contextManager.getRepository())
+                .addListener(processor.get().getListenerKey(event), new 
MetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
+        processor.get().processor(contextManager, event);
+    }
+    
+    /**
+     * Renew to delete database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DropDatabaseListenerAssistedEvent 
event) {
+        Optional<ListenerAssistedProcessor> processor = 
TypedSPILoader.findService(ListenerAssistedProcessor.class, 
event.getClass().getName());
+        if (!processor.isPresent()) {
+            return;
+        }
+        new MetaDataWatchListenerManager((ClusterPersistRepository) 
contextManager.getRepository()).removeListener(processor.get().getListenerKey(event));
+        processor.get().processor(contextManager, event);
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
index 2f6d58befb6..cb3bf443818 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
@@ -27,8 +27,6 @@ import 
org.apache.shardingsphere.mode.event.schema.table.DropTableEvent;
 import org.apache.shardingsphere.mode.event.schema.view.CreateOrAlterViewEvent;
 import org.apache.shardingsphere.mode.event.schema.view.DropViewEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 
@@ -43,26 +41,6 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
     
     private final ContextManager contextManager;
     
-    /**
-     * Renew to persist meta data.
-     *
-     * @param event database added event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseAddedEvent event) {
-        
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addDatabase(event.getDatabaseName());
-    }
-    
-    /**
-     * Renew to delete database.
-     *
-     * @param event database delete event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseDeletedEvent event) {
-        
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropDatabase(event.getDatabaseName());
-    }
-    
     /**
      * Renew to added schema.
      *
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
index d95b48e3682..d056679d555 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
@@ -29,7 +29,9 @@ import 
org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
 import 
org.apache.shardingsphere.metadata.persist.service.config.database.DataSourceUnitPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataPersistService;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.service.enums.ListenerAssistedEnum;
 import 
org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistService;
+import org.apache.shardingsphere.mode.service.pojo.ListenerAssistedPOJO;
 import org.apache.shardingsphere.single.api.config.SingleRuleConfiguration;
 
 import java.util.Collection;
@@ -49,11 +51,14 @@ public final class ClusterMetaDataManagerPersistService 
implements MetaDataManag
     @Override
     public void createDatabase(final String databaseName) {
         
contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataService().addDatabase(databaseName);
+        
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().persistDatabaseNameListenerAssisted(new
 ListenerAssistedPOJO(databaseName, ListenerAssistedEnum.CREATE_DATABASE));
     }
     
     @Override
     public void dropDatabase(final String databaseName) {
-        
contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataService().dropDatabase(databaseName);
+        
contextManager.getPersistServiceFacade().getListenerAssistedPersistService()
+                .persistDatabaseNameListenerAssisted(new 
ListenerAssistedPOJO(contextManager.getDatabase(databaseName).getName(), 
ListenerAssistedEnum.DROP_DATABASE));
+        
contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataService().dropDatabase(contextManager.getDatabase(databaseName).getName());
     }
     
     @Override
diff --git 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index ef200deff89..510d523bfbd 100644
--- 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -19,6 +19,6 @@ 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.stora
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.GlobalRuleChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.PropertiesChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch.ListenerAssistedChangedWatcher
diff --git 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor
similarity index 50%
copy from 
mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
copy to 
mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor
index ef200deff89..f08ee5bc21e 100644
--- 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor
@@ -15,10 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher.QualifiedDataSourceChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.GlobalRuleChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.PropertiesChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor.CreateDatabaseListenerAssistedProcessor
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor.DropDatabaseListenerAssistedProcessor
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 8b3f8c65898..1d6f47c6d58 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -75,6 +75,10 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    }
+    
     @Override
     public void close() {
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index 0df0ca5219d..94c518c1f0e 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -86,6 +86,10 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    }
+    
     @Override
     public void close() {
         REGISTRY_DATA.clear();
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
index 42dec7ec740..935feb4df42 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import 
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -33,7 +33,7 @@ class CacheEvictedSubscriberTest {
         EventBusContext eventBusContext = new EventBusContext();
         eventBusContext.register(new CacheEvictedSubscriber());
         OrderedServicesCache.cacheServices(getClass(), 
Collections.emptyList(), Collections.emptyMap());
-        eventBusContext.post(new DatabaseDeletedEvent("db"));
+        eventBusContext.post(new DropDatabaseListenerAssistedEvent("db"));
         assertFalse(OrderedServicesCache.findCachedServices(getClass(), 
Collections.emptyList()).isPresent());
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
index b003d9e79ca..70683f00457 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
@@ -20,8 +20,6 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.datasource.pool.props.creator.DataSourcePoolPropertiesCreator;
-import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -33,13 +31,10 @@ import 
org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -51,12 +46,9 @@ import org.mockito.quality.Strictness;
 
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
@@ -102,29 +94,6 @@ class ResourceMetaDataChangedSubscriberTest {
         return Collections.singletonMap("db", database);
     }
     
-    @Test
-    void assertRenewForDatabaseAdded() {
-        
when(persistService.getDataSourceUnitService().load("db_added")).thenReturn(createDataSourcePoolPropertiesMap());
-        
when(persistService.getDatabaseRulePersistService().load("db_added")).thenReturn(Collections.emptyList());
-        subscriber.renew(new DatabaseAddedEvent("db_added"));
-        
assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db_added").getResourceMetaData().getStorageUnits());
-    }
-    
-    private Map<String, DataSourcePoolProperties> 
createDataSourcePoolPropertiesMap() {
-        MockedDataSource dataSource = new MockedDataSource();
-        Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(3, 
1F);
-        result.put("primary_ds", 
DataSourcePoolPropertiesCreator.create(dataSource));
-        result.put("replica_ds_0", 
DataSourcePoolPropertiesCreator.create(dataSource));
-        result.put("replica_ds_1", 
DataSourcePoolPropertiesCreator.create(dataSource));
-        return result;
-    }
-    
-    @Test
-    void assertRenewForDatabaseDeleted() {
-        subscriber.renew(new DatabaseDeletedEvent("db"));
-        
assertNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"));
-    }
-    
     @Test
     void assertRenewForSchemaAdded() {
         subscriber.renew(new SchemaAddedEvent("db", "foo_schema"));
diff --git 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index 4a9e5fec657..aeabd5828a2 100644
--- 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -58,4 +58,11 @@ public interface ClusterPersistRepository extends 
PersistRepository {
      * @param listener data changed event listener
      */
     void watch(String key, DataChangedEventListener listener);
+    
+    /**
+     * Remove listener by key.
+     *
+     * @param key key to be removed
+     */
+    void removeDataListener(String key);
 }
diff --git 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 999457f7324..004f53e2123 100644
--- 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -168,6 +168,11 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
                 
WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), 
listener);
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+        // TODO
+    }
+    
     private Type getEventChangedType(final WatchEvent event) {
         if (1 == event.getKeyValue().getVersion()) {
             return Type.ADDED;
diff --git 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 1cfae70b242..babaecc789b 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -50,6 +50,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -62,6 +63,8 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository {
     
     private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
     
+    private final Map<String, CuratorCacheListener> dataListeners = new 
ConcurrentHashMap<>();
+    
     private final Builder builder = CuratorFrameworkFactory.builder();
     
     private CuratorFramework client;
@@ -236,6 +239,9 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository {
     
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
+        if (null != dataListeners.get(key)) {
+            return;
+        }
         CuratorCache cache = caches.get(key);
         if (null == cache) {
             cache = CuratorCache.build(client, key);
@@ -253,6 +259,16 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository {
                 .build();
         cache.listenable().addListener(curatorCacheListener);
         cache.start();
+        dataListeners.computeIfAbsent(key, curator -> curatorCacheListener);
+    }
+    
+    @Override
+    public void removeDataListener(final String key) {
+        CuratorCacheListener cacheListener = dataListeners.remove(key);
+        if (null == cacheListener) {
+            return;
+        }
+        Optional.ofNullable(caches.get(key)).ifPresent(optional -> 
optional.listenable().removeListener(cacheListener));
     }
     
     @Override
diff --git 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index 935e5cbb657..3df9e6cbd75 100644
--- 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -85,6 +85,10 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    }
+    
     @Override
     public void close() {
         REGISTRY_DATA.clear();

Reply via email to