This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 96467c308aa Add GlobalMetaDataChangedListener (#32873)
96467c308aa is described below

commit 96467c308aaf22c8b7c45d6156ec46b0683218ab
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 15 10:31:23 2024 +0800

    Add GlobalMetaDataChangedListener (#32873)
---
 .../dispatch/ListenerAssistedSubscriber.java       |  4 +-
 .../listener/DataChangedEventListenerRegistry.java | 22 +++------
 ...r.java => DatabaseMetaDataChangedListener.java} |  4 +-
 .../listener/GlobalMetaDataChangedListener.java    | 42 +++++++++++++++++
 .../DataChangedEventListenerRegistryTest.java      | 55 ++++++++++++++++++++++
 5 files changed, 108 insertions(+), 19 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index d9f2bb1e9a4..9c0d364ed89 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListen
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.listener.DataChangedEventListenerManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.listener.MetaDataChangedListener;
+import 
org.apache.shardingsphere.mode.manager.cluster.listener.DatabaseMetaDataChangedListener;
 import 
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -55,7 +55,7 @@ public final class ListenerAssistedSubscriber implements 
EventSubscriber {
     @Subscribe
     public synchronized void renew(final CreateDatabaseListenerAssistedEvent 
event) {
         
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()),
-                new 
MetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
+                new 
DatabaseMetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
         
contextManager.getMetaDataContextManager().getSchemaMetaDataManager().addDatabase(event.getDatabaseName());
         
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
         refreshShardingSphereStatisticsData();
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
index a6f5eea8fe6..c781869fa30 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
@@ -20,8 +20,8 @@ package 
org.apache.shardingsphere.mode.manager.cluster.listener;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
-import 
org.apache.shardingsphere.mode.manager.cluster.event.builder.DispatchEventBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.builder.DispatchEventBuilder;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 import java.util.Collection;
@@ -47,23 +47,15 @@ public final class DataChangedEventListenerRegistry {
      * Register data changed event listeners.
      */
     public void register() {
-        databaseNames.forEach(this::register);
-        
ShardingSphereServiceLoader.getServiceInstances(DispatchEventBuilder.class).forEach(this::register);
-    }
-    
-    private void register(final String databaseName) {
-        
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(databaseName),
 new MetaDataChangedListener(eventBusContext));
+        databaseNames.forEach(this::registerDatabaseListeners);
+        
ShardingSphereServiceLoader.getServiceInstances(DispatchEventBuilder.class).forEach(this::registerGlobalListeners);
     }
     
-    private void register(final DispatchEventBuilder<?> builder) {
-        builder.getSubscribedKeys().forEach(each -> register(each, builder));
+    private void registerDatabaseListeners(final String databaseName) {
+        
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(databaseName),
 new DatabaseMetaDataChangedListener(eventBusContext));
     }
     
-    private void register(final String subscribedKey, final 
DispatchEventBuilder<?> builder) {
-        listenerManager.addListener(subscribedKey, dataChangedEvent -> {
-            if 
(builder.getSubscribedTypes().contains(dataChangedEvent.getType())) {
-                
builder.build(dataChangedEvent).ifPresent(eventBusContext::post);
-            }
-        });
+    private void registerGlobalListeners(final DispatchEventBuilder<?> 
builder) {
+        builder.getSubscribedKeys().forEach(each -> 
listenerManager.addListener(each, new 
GlobalMetaDataChangedListener(eventBusContext, builder)));
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/MetaDataChangedListener.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DatabaseMetaDataChangedListener.java
similarity index 98%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/MetaDataChangedListener.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DatabaseMetaDataChangedListener.java
index 55dcd8fa00e..fc57df2b405 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/MetaDataChangedListener.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DatabaseMetaDataChangedListener.java
@@ -45,10 +45,10 @@ import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.Optional;
 
 /**
- * Meta data changed listener.
+ * Database meta data changed listener.
  */
 @RequiredArgsConstructor
-public final class MetaDataChangedListener implements DataChangedEventListener 
{
+public final class DatabaseMetaDataChangedListener implements 
DataChangedEventListener {
     
     private final EventBusContext eventBusContext;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/GlobalMetaDataChangedListener.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/GlobalMetaDataChangedListener.java
new file mode 100644
index 00000000000..33a5047601f
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/GlobalMetaDataChangedListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.listener;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.builder.DispatchEventBuilder;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+
+/**
+ * Global meta data changed listener.
+ */
+@RequiredArgsConstructor
+public final class GlobalMetaDataChangedListener implements 
DataChangedEventListener {
+    
+    private final EventBusContext eventBusContext;
+    
+    private final DispatchEventBuilder<?> builder;
+    
+    @Override
+    public void onChange(final DataChangedEvent event) {
+        if (builder.getSubscribedTypes().contains(event.getType())) {
+            builder.build(event).ifPresent(eventBusContext::post);
+        }
+    }
+}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistryTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistryTest.java
new file mode 100644
index 00000000000..51276c0c31f
--- /dev/null
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.listener;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class DataChangedEventListenerRegistryTest {
+    
+    @Test
+    void assertRegister() {
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        
when(contextManager.getPersistServiceFacade().getRepository()).thenReturn(mock(ClusterPersistRepository.class));
+        DataChangedEventListenerManager listenerManager = 
mock(DataChangedEventListenerManager.class);
+        DataChangedEventListenerRegistry registry = new 
DataChangedEventListenerRegistry(contextManager, 
Collections.singleton("foo_db"));
+        setDataChangedEventListenerManager(registry, listenerManager);
+        registry.register();
+        verify(listenerManager).addListener(eq("/metadata/foo_db"), any());
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setDataChangedEventListenerManager(final 
DataChangedEventListenerRegistry registry, final 
DataChangedEventListenerManager listenerManager) {
+        Field field = 
DataChangedEventListenerRegistry.class.getDeclaredField("listenerManager");
+        field.setAccessible(true);
+        field.set(registry, listenerManager);
+        field.setAccessible(false);
+    }
+}

Reply via email to