This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 96467c308aa Add GlobalMetaDataChangedListener (#32873)
96467c308aa is described below
commit 96467c308aaf22c8b7c45d6156ec46b0683218ab
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 15 10:31:23 2024 +0800
Add GlobalMetaDataChangedListener (#32873)
---
.../dispatch/ListenerAssistedSubscriber.java | 4 +-
.../listener/DataChangedEventListenerRegistry.java | 22 +++------
...r.java => DatabaseMetaDataChangedListener.java} | 4 +-
.../listener/GlobalMetaDataChangedListener.java | 42 +++++++++++++++++
.../DataChangedEventListenerRegistryTest.java | 55 ++++++++++++++++++++++
5 files changed, 108 insertions(+), 19 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index d9f2bb1e9a4..9c0d364ed89 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListen
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.listener.DataChangedEventListenerManager;
-import
org.apache.shardingsphere.mode.manager.cluster.listener.MetaDataChangedListener;
+import
org.apache.shardingsphere.mode.manager.cluster.listener.DatabaseMetaDataChangedListener;
import
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -55,7 +55,7 @@ public final class ListenerAssistedSubscriber implements
EventSubscriber {
@Subscribe
public synchronized void renew(final CreateDatabaseListenerAssistedEvent
event) {
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()),
- new
MetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
+ new
DatabaseMetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
contextManager.getMetaDataContextManager().getSchemaMetaDataManager().addDatabase(event.getDatabaseName());
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
refreshShardingSphereStatisticsData();
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
index a6f5eea8fe6..c781869fa30 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistry.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.mode.manager.cluster.listener;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
-import
org.apache.shardingsphere.mode.manager.cluster.event.builder.DispatchEventBuilder;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.builder.DispatchEventBuilder;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Collection;
@@ -47,23 +47,15 @@ public final class DataChangedEventListenerRegistry {
* Register data changed event listeners.
*/
public void register() {
- databaseNames.forEach(this::register);
-
ShardingSphereServiceLoader.getServiceInstances(DispatchEventBuilder.class).forEach(this::register);
- }
-
- private void register(final String databaseName) {
-
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(databaseName),
new MetaDataChangedListener(eventBusContext));
+ databaseNames.forEach(this::registerDatabaseListeners);
+
ShardingSphereServiceLoader.getServiceInstances(DispatchEventBuilder.class).forEach(this::registerGlobalListeners);
}
- private void register(final DispatchEventBuilder<?> builder) {
- builder.getSubscribedKeys().forEach(each -> register(each, builder));
+ private void registerDatabaseListeners(final String databaseName) {
+
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(databaseName),
new DatabaseMetaDataChangedListener(eventBusContext));
}
- private void register(final String subscribedKey, final
DispatchEventBuilder<?> builder) {
- listenerManager.addListener(subscribedKey, dataChangedEvent -> {
- if
(builder.getSubscribedTypes().contains(dataChangedEvent.getType())) {
-
builder.build(dataChangedEvent).ifPresent(eventBusContext::post);
- }
- });
+ private void registerGlobalListeners(final DispatchEventBuilder<?>
builder) {
+ builder.getSubscribedKeys().forEach(each ->
listenerManager.addListener(each, new
GlobalMetaDataChangedListener(eventBusContext, builder)));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/MetaDataChangedListener.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DatabaseMetaDataChangedListener.java
similarity index 98%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/MetaDataChangedListener.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DatabaseMetaDataChangedListener.java
index 55dcd8fa00e..fc57df2b405 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/MetaDataChangedListener.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/DatabaseMetaDataChangedListener.java
@@ -45,10 +45,10 @@ import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.Optional;
/**
- * Meta data changed listener.
+ * Database meta data changed listener.
*/
@RequiredArgsConstructor
-public final class MetaDataChangedListener implements DataChangedEventListener
{
+public final class DatabaseMetaDataChangedListener implements
DataChangedEventListener {
private final EventBusContext eventBusContext;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/GlobalMetaDataChangedListener.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/GlobalMetaDataChangedListener.java
new file mode 100644
index 00000000000..33a5047601f
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/listener/GlobalMetaDataChangedListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.listener;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.event.builder.DispatchEventBuilder;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+
+/**
+ * Global meta data changed listener.
+ */
+@RequiredArgsConstructor
+public final class GlobalMetaDataChangedListener implements
DataChangedEventListener {
+
+ private final EventBusContext eventBusContext;
+
+ private final DispatchEventBuilder<?> builder;
+
+ @Override
+ public void onChange(final DataChangedEvent event) {
+ if (builder.getSubscribedTypes().contains(event.getType())) {
+ builder.build(event).ifPresent(eventBusContext::post);
+ }
+ }
+}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistryTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistryTest.java
new file mode 100644
index 00000000000..51276c0c31f
--- /dev/null
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/listener/DataChangedEventListenerRegistryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.listener;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class DataChangedEventListenerRegistryTest {
+
+ @Test
+ void assertRegister() {
+ ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
+
when(contextManager.getPersistServiceFacade().getRepository()).thenReturn(mock(ClusterPersistRepository.class));
+ DataChangedEventListenerManager listenerManager =
mock(DataChangedEventListenerManager.class);
+ DataChangedEventListenerRegistry registry = new
DataChangedEventListenerRegistry(contextManager,
Collections.singleton("foo_db"));
+ setDataChangedEventListenerManager(registry, listenerManager);
+ registry.register();
+ verify(listenerManager).addListener(eq("/metadata/foo_db"), any());
+ }
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void setDataChangedEventListenerManager(final
DataChangedEventListenerRegistry registry, final
DataChangedEventListenerManager listenerManager) {
+ Field field =
DataChangedEventListenerRegistry.class.getDeclaredField("listenerManager");
+ field.setAccessible(true);
+ field.set(registry, listenerManager);
+ field.setAccessible(false);
+ }
+}