This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 050b7aa15f8 Add meta data listener assisted node path to dynamic watch
create or drop database (#31527)
050b7aa15f8 is described below
commit 050b7aa15f8bc70d29475e0b0a5d6c84ef337bb1
Author: zhaojinchao <[email protected]>
AuthorDate: Thu Jun 6 16:36:29 2024 +0800
Add meta data listener assisted node path to dynamic watch create or drop
database (#31527)
* Add meta data listener assisted node path to dynamic watch create or drop
database
* Fix spotless
* Fix bug
* Fix checkstyle
* Fix drop database problem
* Refactor listener type
* Fix sploess
* Refactor persist
* Fix ci
---
.../mode/path/ListenerAssistedNodePath.java | 65 +++++++++++++++++++
.../mode/path/ListenerAssistedNodePathTest.java | 47 ++++++++++++++
.../mode/processor/ListenerAssistedProcessor.java} | 41 +++++-------
.../mode/service/PersistServiceFacade.java | 4 ++
.../mode/service/enums/ListenerAssistedEnum.java | 24 +++++++
.../persist/ListenerAssistedPersistService.java | 51 +++++++++++++++
.../mode/service/pojo/ListenerAssistedPOJO.java | 39 ++++++++++++
.../cluster/ClusterContextManagerBuilder.java | 18 +++++-
.../MetaDataChangedListener.java} | 63 +++++++------------
.../MetaDataWatchListenerManager.java} | 47 ++++++--------
.../CreateDatabaseListenerAssistedProcessor.java | 45 +++++++++++++
.../DropDatabaseListenerAssistedProcessor.java | 45 +++++++++++++
.../watch/ListenerAssistedChangedWatcher.java} | 41 ++++++------
.../coordinator/registry/GovernanceWatcher.java | 3 +-
.../registry/GovernanceWatcherFactory.java | 6 +-
.../data/ShardingSphereDataChangedWatcher.java | 2 +-
.../CreateDatabaseListenerAssistedEvent.java | 32 ++++++++++
.../DropDatabaseListenerAssistedEvent.java | 32 ++++++++++
.../watcher/ClusterStateChangedWatcher.java | 2 +-
.../watcher/ComputeNodeStateChangedWatcher.java | 2 +-
.../watcher/QualifiedDataSourceChangedWatcher.java | 2 +-
.../registry/watcher/GlobalRuleChangedWatcher.java | 2 +-
.../registry/watcher/PropertiesChangedWatcher.java | 2 +-
.../subscriber/ClusterEventSubscriberRegistry.java | 1 +
.../ListenerAssistedMetaDataChangedSubscriber.java | 73 ++++++++++++++++++++++
.../ResourceMetaDataChangedSubscriber.java | 22 -------
.../ClusterMetaDataManagerPersistService.java | 7 ++-
....cluster.coordinator.registry.GovernanceWatcher | 2 +-
...phere.mode.processor.ListenerAssistedProcessor} | 9 +--
.../fixture/ClusterPersistRepositoryFixture.java | 4 ++
...ProcessListClusterPersistRepositoryFixture.java | 4 ++
.../subscriber/CacheEvictedSubscriberTest.java | 4 +-
.../ResourceMetaDataChangedSubscriberTest.java | 31 ---------
.../cluster/ClusterPersistRepository.java | 7 +++
.../repository/cluster/etcd/EtcdRepository.java | 5 ++
.../cluster/zookeeper/ZookeeperRepository.java | 16 +++++
.../fixture/ClusterPersistRepositoryFixture.java | 4 ++
37 files changed, 611 insertions(+), 193 deletions(-)
diff --git
a/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
new file mode 100644
index 00000000000..b32bdf7af90
--- /dev/null
+++
b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.path;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Listener assisted node path.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ListenerAssistedNodePath {
+
+ private static final String ROOT_NODE = "listener_assisted";
+
+ /**
+ * Get root node path.
+ *
+ * @return root node path
+ */
+ public static String getRootNodePath() {
+ return String.join("/", "", ROOT_NODE);
+ }
+
+ /**
+ * Get database name by node path.
+ *
+ * @param nodePath node path
+ * @return database name
+ */
+ public static Optional<String> getDatabaseName(final String nodePath) {
+ Pattern pattern = Pattern.compile(getRootNodePath() + "/(\\w+)$",
Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(nodePath);
+ return matcher.find() ? Optional.of(matcher.group(1)) :
Optional.empty();
+ }
+
+ /**
+ * Get database base name node path.
+ *
+ * @param databaseName database name
+ * @return database name node path
+ */
+ public static String getDatabaseNameNodePath(final String databaseName) {
+ return String.join("/", "", ROOT_NODE, databaseName);
+ }
+}
diff --git
a/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
new file mode 100644
index 00000000000..e21cc563544
--- /dev/null
+++
b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.path;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ListenerAssistedNodePathTest {
+
+ @Test
+ void assertRooPath() {
+ assertThat(ListenerAssistedNodePath.getRootNodePath(),
is("/listener_assisted"));
+ }
+
+ @Test
+ void assertGetDatabaseName() {
+ Optional<String> actual =
ListenerAssistedNodePath.getDatabaseName("/listener_assisted/foo_db");
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), Matchers.is("foo_db"));
+ }
+
+ @Test
+ void assertGetDatabaseNameNodePath() {
+ assertThat(ListenerAssistedNodePath.getDatabaseNameNodePath("foo_db"),
is("/listener_assisted/foo_db"));
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/processor/ListenerAssistedProcessor.java
similarity index 53%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/processor/ListenerAssistedProcessor.java
index f1ce5f0c35c..c45d36d668d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/processor/ListenerAssistedProcessor.java
@@ -15,43 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+package org.apache.shardingsphere.mode.processor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-
-import java.util.Collection;
-import java.util.Optional;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.mode.manager.ContextManager;
/**
- * Governance watcher.
- *
- * @param <T> type of event
+ * Listener assisted processor.
+ *
+ * @param <T> class type of implemented governance event
*/
@SingletonSPI
-public interface GovernanceWatcher<T> {
+public interface ListenerAssistedProcessor<T extends GovernanceEvent> extends
TypedSPI {
/**
- * Get watching keys.
+ * Get listener key.
*
- * @param databaseNames database names
- * @return watching keys
+ * @param event event
+ * @return listener key
*/
- Collection<String> getWatchingKeys(Collection<String> databaseNames);
+ String getListenerKey(T event);
/**
- * Get watching types.
+ * Post-process.
*
- * @return watching types
- */
- Collection<Type> getWatchingTypes();
-
- /**
- * Create governance event.
- *
- * @param event registry center data changed event
- * @return governance event
+ * @param contextManager context manager
+ * @param event event
*/
- Optional<T> createGovernanceEvent(DataChangedEvent event);
+ void processor(ContextManager contextManager, T event);
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
index 17ba7def416..a6af914a4d8 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistServ
import org.apache.shardingsphere.mode.service.persist.PersistServiceBuilder;
import org.apache.shardingsphere.mode.service.persist.ProcessPersistService;
import
org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
+import
org.apache.shardingsphere.mode.service.persist.ListenerAssistedPersistService;
import
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.StatePersistService;
@@ -47,6 +48,8 @@ public final class PersistServiceFacade {
private final ProcessPersistService processPersistService;
+ private final ListenerAssistedPersistService
listenerAssistedPersistService;
+
private final QualifiedDataSourceStatePersistService
qualifiedDataSourceStatePersistService;
public PersistServiceFacade(final PersistRepository repository, final
ModeConfiguration modeConfiguration, final ContextManager contextManager) {
@@ -57,6 +60,7 @@ public final class PersistServiceFacade {
PersistServiceBuilder persistServiceBuilder =
TypedSPILoader.getService(PersistServiceBuilder.class,
modeConfiguration.getType());
metaDataManagerPersistService =
persistServiceBuilder.buildMetaDataManagerPersistService(contextManager);
processPersistService =
persistServiceBuilder.buildProcessPersistService(repository);
+ listenerAssistedPersistService = new
ListenerAssistedPersistService(repository);
}
/**
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/enums/ListenerAssistedEnum.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/enums/ListenerAssistedEnum.java
new file mode 100644
index 00000000000..26e79a520b3
--- /dev/null
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/enums/ListenerAssistedEnum.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.service.enums;
+
+public enum ListenerAssistedEnum {
+ CREATE_DATABASE,
+
+ DROP_DATABASE
+}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/persist/ListenerAssistedPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/persist/ListenerAssistedPersistService.java
new file mode 100644
index 00000000000..1919d59c622
--- /dev/null
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/persist/ListenerAssistedPersistService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.service.persist;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath;
+import org.apache.shardingsphere.mode.service.pojo.ListenerAssistedPOJO;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
+
+/**
+ * Listener assisted persist service.
+ */
+@RequiredArgsConstructor
+public final class ListenerAssistedPersistService {
+
+ private final PersistRepository repository;
+
+ /**
+ * Persist database name listener assisted.
+ *
+ * @param listenerAssistedPOJO listener assisted pojo
+ */
+ public void persistDatabaseNameListenerAssisted(final ListenerAssistedPOJO
listenerAssistedPOJO) {
+
repository.persist(ListenerAssistedNodePath.getDatabaseNameNodePath(listenerAssistedPOJO.getDatabaseName()),
YamlEngine.marshal(listenerAssistedPOJO));
+ }
+
+ /**
+ * Delete database name listener assisted.
+ *
+ * @param databaseName database name
+ */
+ public void deleteDatabaseNameListenerAssisted(final String databaseName) {
+
repository.delete(ListenerAssistedNodePath.getDatabaseNameNodePath(databaseName));
+ }
+}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ListenerAssistedPOJO.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ListenerAssistedPOJO.java
new file mode 100644
index 00000000000..935a2721de6
--- /dev/null
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ListenerAssistedPOJO.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.service.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import org.apache.shardingsphere.mode.service.enums.ListenerAssistedEnum;
+
+/**
+ * Listener assisted pojo.
+ */
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+public final class ListenerAssistedPOJO implements YamlConfiguration {
+
+ private String databaseName;
+
+ private ListenerAssistedEnum listenerAssistedEnum;
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 5ce99e6dd3a..7d7de23b622 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,12 +26,15 @@ import
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaDa
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataWatchListenerManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
@@ -43,7 +46,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import
org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
import java.sql.SQLException;
-import java.util.Collections;
+import java.util.Collection;
/**
* Cluster context manager builder.
@@ -86,8 +89,8 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void registerOnline(final EventBusContext eventBusContext, final
ComputeNodeInstanceContext computeNodeInstanceContext,
final ClusterPersistRepository repository,
final ContextManagerBuilderParameter param, final ContextManager
contextManager) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
- new GovernanceWatcherFactory(repository,
- eventBusContext, param.getInstanceMetaData() instanceof
JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() :
Collections.emptyList()).watchListeners();
+ new GovernanceWatcherFactory(repository,
eventBusContext).watchListeners();
+ watchDatabaseMetaDataListener(param,
contextManager.getPersistServiceFacade().getMetaDataPersistService(),
eventBusContext);
if (null != param.getLabels()) {
contextManager.getComputeNodeInstanceContext().getInstance().getLabels().addAll(param.getLabels());
}
@@ -95,6 +98,15 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
new ClusterEventSubscriberRegistry(contextManager,
repository).register();
}
+ private void watchDatabaseMetaDataListener(final
ContextManagerBuilderParameter param, final MetaDataPersistService
metaDataPersistService, final EventBusContext eventBusContext) {
+ getDatabaseNames(param, metaDataPersistService).forEach(each -> new
MetaDataWatchListenerManager((ClusterPersistRepository)
metaDataPersistService.getRepository())
+ .addListener(DatabaseMetaDataNode.getDatabaseNamePath(each),
new MetaDataChangedListener(eventBusContext)));
+ }
+
+ private Collection<String> getDatabaseNames(final
ContextManagerBuilderParameter param, final MetaDataPersistService
metaDataPersistService) {
+ return param.getInstanceMetaData() instanceof JDBCInstanceMetaData ?
param.getDatabaseConfigs().keySet() :
metaDataPersistService.getDatabaseMetaDataService().loadAllDatabaseNames();
+ }
+
@Override
public String getType() {
return "Cluster";
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
similarity index 81%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
index 0bca081f199..6cbc251e5ac 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener;
import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import
org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode;
import
org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode;
@@ -35,57 +37,40 @@ import
org.apache.shardingsphere.mode.event.schema.table.CreateOrAlterTableEvent
import org.apache.shardingsphere.mode.event.schema.table.DropTableEvent;
import org.apache.shardingsphere.mode.event.schema.view.CreateOrAlterViewEvent;
import org.apache.shardingsphere.mode.event.schema.view.DropViewEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import
org.apache.shardingsphere.mode.metadata.builder.RuleConfigurationEventBuilder;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
- * Meta data changed watcher.
+ * Meta data changed listener.
*/
-public final class MetaDataChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
+@RequiredArgsConstructor
+public final class MetaDataChangedListener implements DataChangedEventListener
{
- private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder
= new RuleConfigurationEventBuilder();
+ private final EventBusContext eventBusContext;
- @Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
- return databaseNames.isEmpty()
- ? Collections.singleton(DatabaseMetaDataNode.getMetaDataNode())
- :
databaseNames.stream().map(DatabaseMetaDataNode::getDatabaseNamePath).collect(Collectors.toList());
- }
+ private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder
= new RuleConfigurationEventBuilder();
@Override
- public Collection<Type> getWatchingTypes() {
- return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+ public void onChange(final DataChangedEvent event) {
+ createGovernanceEvent(event).ifPresent(eventBusContext::post);
}
- @Override
- public Optional<GovernanceEvent> createGovernanceEvent(final
DataChangedEvent event) {
+ private Optional<GovernanceEvent> createGovernanceEvent(final
DataChangedEvent event) {
String key = event.getKey();
- Optional<String> databaseName =
DatabaseMetaDataNode.getDatabaseName(key);
- if (databaseName.isPresent()) {
- return createDatabaseChangedEvent(databaseName.get(), event);
- }
- databaseName = DatabaseMetaDataNode.getDatabaseNameBySchemaNode(key);
+ Optional<String> databaseName =
DatabaseMetaDataNode.getDatabaseNameBySchemaNode(key);
Optional<String> schemaName = DatabaseMetaDataNode.getSchemaName(key);
if (databaseName.isPresent() && schemaName.isPresent()) {
return createSchemaChangedEvent(databaseName.get(),
schemaName.get(), event);
}
schemaName = DatabaseMetaDataNode.getSchemaNameByTableNode(key);
- if (databaseName.isPresent() && schemaName.isPresent() &&
(TableMetaDataNode.isTableActiveVersionNode(event.getKey()))
- || TableMetaDataNode.isTableNode(event.getKey())) {
+ if (databaseName.isPresent() && schemaName.isPresent() &&
tableMetaDataChanged(event.getKey())) {
return createTableChangedEvent(databaseName.get(),
schemaName.get(), event);
}
- if (databaseName.isPresent() && schemaName.isPresent() &&
(ViewMetaDataNode.isViewActiveVersionNode(event.getKey())
- || ViewMetaDataNode.isViewNode(event.getKey()))) {
+ if (databaseName.isPresent() && schemaName.isPresent() &&
viewMetaDataChanged(event.getKey())) {
return createViewChangedEvent(databaseName.get(),
schemaName.get(), event);
}
if (!databaseName.isPresent()) {
@@ -97,16 +82,6 @@ public final class MetaDataChangedWatcher implements
GovernanceWatcher<Governanc
return ruleConfigurationEventBuilder.build(databaseName.get(), event);
}
- private Optional<GovernanceEvent> createDatabaseChangedEvent(final String
databaseName, final DataChangedEvent event) {
- if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
- return Optional.of(new DatabaseAddedEvent(databaseName));
- }
- if (Type.DELETED == event.getType()) {
- return Optional.of(new DatabaseDeletedEvent(databaseName));
- }
- return Optional.empty();
- }
-
private Optional<GovernanceEvent> createSchemaChangedEvent(final String
databaseName, final String schemaName, final DataChangedEvent event) {
if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
return Optional.of(new SchemaAddedEvent(databaseName, schemaName));
@@ -117,6 +92,14 @@ public final class MetaDataChangedWatcher implements
GovernanceWatcher<Governanc
return Optional.empty();
}
+ private boolean tableMetaDataChanged(final String key) {
+ return TableMetaDataNode.isTableActiveVersionNode(key) ||
TableMetaDataNode.isTableNode(key);
+ }
+
+ private boolean viewMetaDataChanged(final String key) {
+ return ViewMetaDataNode.isViewActiveVersionNode(key) ||
ViewMetaDataNode.isViewNode(key);
+ }
+
private Optional<GovernanceEvent> createTableChangedEvent(final String
databaseName, final String schemaName, final DataChangedEvent event) {
if (Type.DELETED == event.getType() &&
TableMetaDataNode.isTableNode(event.getKey())) {
Optional<String> tableName =
TableMetaDataNode.getTableName(event.getKey());
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataWatchListenerManager.java
similarity index 51%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
copy to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataWatchListenerManager.java
index f1ce5f0c35c..b08299ef293 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataWatchListenerManager.java
@@ -15,43 +15,36 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-
-import java.util.Collection;
-import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
/**
- * Governance watcher.
- *
- * @param <T> type of event
+ * Meta data watch listener manager.
*/
-@SingletonSPI
-public interface GovernanceWatcher<T> {
+@RequiredArgsConstructor
+public final class MetaDataWatchListenerManager {
- /**
- * Get watching keys.
- *
- * @param databaseNames database names
- * @return watching keys
- */
- Collection<String> getWatchingKeys(Collection<String> databaseNames);
+ private final ClusterPersistRepository repository;
/**
- * Get watching types.
+ * Add listener.
*
- * @return watching types
+ * @param listenerKey listener key
+ * @param dataChangedEventListener data changed event listener
*/
- Collection<Type> getWatchingTypes();
+ public void addListener(final String listenerKey, final
DataChangedEventListener dataChangedEventListener) {
+ repository.watch(listenerKey, dataChangedEventListener);
+ }
/**
- * Create governance event.
- *
- * @param event registry center data changed event
- * @return governance event
+ * Remove listener.
+ *
+ * @param listenerKey listener key
*/
- Optional<T> createGovernanceEvent(DataChangedEvent event);
+ public void removeListener(final String listenerKey) {
+ repository.removeDataListener(listenerKey);
+ }
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/CreateDatabaseListenerAssistedProcessor.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/CreateDatabaseListenerAssistedProcessor.java
new file mode 100644
index 00000000000..731dc0980ed
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/CreateDatabaseListenerAssistedProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor;
+
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.CreateDatabaseListenerAssistedEvent;
+
+/**
+ * Create database listener assisted processor.
+ */
+public final class CreateDatabaseListenerAssistedProcessor implements
ListenerAssistedProcessor<CreateDatabaseListenerAssistedEvent> {
+
+ @Override
+ public String getListenerKey(final CreateDatabaseListenerAssistedEvent
event) {
+ return
DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName());
+ }
+
+ @Override
+ public void processor(final ContextManager contextManager, final
CreateDatabaseListenerAssistedEvent event) {
+
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addDatabase(event.getDatabaseName());
+
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+ }
+
+ @Override
+ public String getType() {
+ return CreateDatabaseListenerAssistedEvent.class.getName();
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/DropDatabaseListenerAssistedProcessor.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/DropDatabaseListenerAssistedProcessor.java
new file mode 100644
index 00000000000..37d65b91ba8
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/processor/DropDatabaseListenerAssistedProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor;
+
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor;
+
+/**
+ * Drop database listener assisted processor.
+ */
+public final class DropDatabaseListenerAssistedProcessor implements
ListenerAssistedProcessor<DropDatabaseListenerAssistedEvent> {
+
+ @Override
+ public String getListenerKey(final DropDatabaseListenerAssistedEvent
event) {
+ return
DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName());
+ }
+
+ @Override
+ public void processor(final ContextManager contextManager, final
DropDatabaseListenerAssistedEvent event) {
+
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropDatabase(event.getDatabaseName());
+
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+ }
+
+ @Override
+ public String getType() {
+ return DropDatabaseListenerAssistedEvent.class.getName();
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
similarity index 51%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
copy to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
index 05cd9c2198f..cd6b90bcca8 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
@@ -15,51 +15,50 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch;
-import com.google.common.base.Strings;
-import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
-import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
-import
org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatus;
-import
org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatusSwapper;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.CreateDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.service.enums.ListenerAssistedEnum;
+import org.apache.shardingsphere.mode.service.pojo.ListenerAssistedPOJO;
+import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
/**
- * Qualified data source changed watcher.
+ * Listener assisted changed watcher.
*/
-public final class QualifiedDataSourceChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
+public class ListenerAssistedChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
- return Collections.singleton(QualifiedDataSourceNode.getRootPath());
+ public Collection<String> getWatchingKeys() {
+ return
Collections.singleton(ListenerAssistedNodePath.getRootNodePath());
}
@Override
- public Collection<Type> getWatchingTypes() {
- return Arrays.asList(Type.ADDED, Type.UPDATED);
+ public Collection<DataChangedEvent.Type> getWatchingTypes() {
+ return Collections.singletonList(Type.ADDED);
}
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final
DataChangedEvent event) {
- if (Strings.isNullOrEmpty(event.getValue())) {
+ Optional<String> databaseName =
ListenerAssistedNodePath.getDatabaseName(event.getKey());
+ if (!databaseName.isPresent()) {
return Optional.empty();
}
- Optional<QualifiedDataSource> qualifiedDataSource =
QualifiedDataSourceNode.extractQualifiedDataSource(event.getKey());
- if (qualifiedDataSource.isPresent()) {
- QualifiedDataSourceState status = new
YamlQualifiedDataSourceStatusSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
YamlQualifiedDataSourceStatus.class));
- return Optional.of(new
StorageNodeChangedEvent(qualifiedDataSource.get(), status));
+ ListenerAssistedPOJO data = YamlEngine.unmarshal(event.getValue(),
ListenerAssistedPOJO.class);
+ if (ListenerAssistedEnum.CREATE_DATABASE ==
data.getListenerAssistedEnum()) {
+ return Optional.of(new
CreateDatabaseListenerAssistedEvent(databaseName.get()));
}
- return Optional.empty();
+ return ListenerAssistedEnum.DROP_DATABASE ==
data.getListenerAssistedEnum()
+ ? Optional.of(new
DropDatabaseListenerAssistedEvent(databaseName.get()))
+ : Optional.empty();
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
index f1ce5f0c35c..efb25e49d86 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
@@ -35,10 +35,9 @@ public interface GovernanceWatcher<T> {
/**
* Get watching keys.
*
- * @param databaseNames database names
* @return watching keys
*/
- Collection<String> getWatchingKeys(Collection<String> databaseNames);
+ Collection<String> getWatchingKeys();
/**
* Get watching types.
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index e12299b7afd..9e84cfa7b74 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -22,8 +22,6 @@ import
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.Collection;
-
/**
* Governance watcher factory.
*/
@@ -34,8 +32,6 @@ public final class GovernanceWatcherFactory {
private final EventBusContext eventBusContext;
- private final Collection<String> databaseNames;
-
/**
* Watch listeners.
*/
@@ -46,7 +42,7 @@ public final class GovernanceWatcherFactory {
}
private void watch(final GovernanceWatcher<?> listener) {
- for (String each : listener.getWatchingKeys(databaseNames)) {
+ for (String each : listener.getWatchingKeys()) {
watch(each, listener);
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index b45709733f4..69566a166fe 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -45,7 +45,7 @@ import java.util.Optional;
public final class ShardingSphereDataChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
+ public Collection<String> getWatchingKeys() {
return
Collections.singleton(ShardingSphereDataNode.getShardingSphereDataNodePath());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/CreateDatabaseListenerAssistedEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/CreateDatabaseListenerAssistedEvent.java
new file mode 100644
index 00000000000..05094eab177
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/CreateDatabaseListenerAssistedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+
+/**
+ * Create database listener assisted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CreateDatabaseListenerAssistedEvent implements
GovernanceEvent {
+
+ private final String databaseName;
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/DropDatabaseListenerAssistedEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/DropDatabaseListenerAssistedEvent.java
new file mode 100644
index 00000000000..e6d8bd94518
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/listener/DropDatabaseListenerAssistedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+
+/**
+ * Drop database listener assisted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DropDatabaseListenerAssistedEvent implements
GovernanceEvent {
+
+ private final String databaseName;
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
index 716056ccbb3..66fb9eabd07 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
@@ -37,7 +37,7 @@ import java.util.Optional;
public final class ClusterStateChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
+ public Collection<String> getWatchingKeys() {
return Collections.singleton(ComputeNode.getClusterStateNodePath());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 0e5f0d56a28..143d148e874 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -54,7 +54,7 @@ import java.util.regex.Pattern;
public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
+ public Collection<String> getWatchingKeys() {
return Collections.singleton(ComputeNode.getComputeNodePath());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
index 05cd9c2198f..aeb4e05fd54 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
@@ -41,7 +41,7 @@ import java.util.Optional;
public final class QualifiedDataSourceChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
+ public Collection<String> getWatchingKeys() {
return Collections.singleton(QualifiedDataSourceNode.getRootPath());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
index a101c50bc81..2784d8e4759 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/GlobalRuleChangedWatcher.java
@@ -36,7 +36,7 @@ import java.util.Optional;
public final class GlobalRuleChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
+ public Collection<String> getWatchingKeys() {
return Collections.singleton(GlobalNode.getGlobalRuleRootNode());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
index 816d947427b..c83449c0b8c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/watcher/PropertiesChangedWatcher.java
@@ -35,7 +35,7 @@ import java.util.Optional;
public final class PropertiesChangedWatcher implements
GovernanceWatcher<AlterPropertiesEvent> {
@Override
- public Collection<String> getWatchingKeys(final Collection<String>
databaseNames) {
+ public Collection<String> getWatchingKeys() {
return Collections.singleton(GlobalNode.getPropsRootNode());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
index 5326fe989c4..ae15b0a2214 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
@@ -34,6 +34,7 @@ public final class ClusterEventSubscriberRegistry extends
EventSubscriberRegistr
new ConfigurationChangedSubscriber(contextManager),
new ConfigurationChangedSubscriber(contextManager),
new ResourceMetaDataChangedSubscriber(contextManager),
+ new ListenerAssistedMetaDataChangedSubscriber(contextManager),
new StateChangedSubscriber(contextManager),
new DatabaseChangedSubscriber(contextManager),
new ProcessListChangedSubscriber(contextManager, repository),
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedMetaDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedMetaDataChangedSubscriber.java
new file mode 100644
index 00000000000..385edccde34
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedMetaDataChangedSubscriber.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataWatchListenerManager;
+import org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.CreateDatabaseListenerAssistedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Optional;
+
+/**
+ * Listener assisted meta data changed subscriber.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@RequiredArgsConstructor
+public final class ListenerAssistedMetaDataChangedSubscriber implements
EventSubscriber {
+
+ private final ContextManager contextManager;
+
+ /**
+ * Renew to persist meta data.
+ *
+ * @param event database added event
+ */
+ @Subscribe
+ public synchronized void renew(final CreateDatabaseListenerAssistedEvent
event) {
+ Optional<ListenerAssistedProcessor> processor =
TypedSPILoader.findService(ListenerAssistedProcessor.class,
event.getClass().getName());
+ if (!processor.isPresent()) {
+ return;
+ }
+ new MetaDataWatchListenerManager((ClusterPersistRepository)
contextManager.getRepository())
+ .addListener(processor.get().getListenerKey(event), new
MetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
+ processor.get().processor(contextManager, event);
+ }
+
+ /**
+ * Renew to delete database.
+ *
+ * @param event database delete event
+ */
+ @Subscribe
+ public synchronized void renew(final DropDatabaseListenerAssistedEvent
event) {
+ Optional<ListenerAssistedProcessor> processor =
TypedSPILoader.findService(ListenerAssistedProcessor.class,
event.getClass().getName());
+ if (!processor.isPresent()) {
+ return;
+ }
+ new MetaDataWatchListenerManager((ClusterPersistRepository)
contextManager.getRepository()).removeListener(processor.get().getListenerKey(event));
+ processor.get().processor(contextManager, event);
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
index 2f6d58befb6..cb3bf443818 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
@@ -27,8 +27,6 @@ import
org.apache.shardingsphere.mode.event.schema.table.DropTableEvent;
import org.apache.shardingsphere.mode.event.schema.view.CreateOrAlterViewEvent;
import org.apache.shardingsphere.mode.event.schema.view.DropViewEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
@@ -43,26 +41,6 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
private final ContextManager contextManager;
- /**
- * Renew to persist meta data.
- *
- * @param event database added event
- */
- @Subscribe
- public synchronized void renew(final DatabaseAddedEvent event) {
-
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addDatabase(event.getDatabaseName());
- }
-
- /**
- * Renew to delete database.
- *
- * @param event database delete event
- */
- @Subscribe
- public synchronized void renew(final DatabaseDeletedEvent event) {
-
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropDatabase(event.getDatabaseName());
- }
-
/**
* Renew to added schema.
*
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
index d95b48e3682..d056679d555 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistService.java
@@ -29,7 +29,9 @@ import
org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
import
org.apache.shardingsphere.metadata.persist.service.config.database.DataSourceUnitPersistService;
import
org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.service.enums.ListenerAssistedEnum;
import
org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistService;
+import org.apache.shardingsphere.mode.service.pojo.ListenerAssistedPOJO;
import org.apache.shardingsphere.single.api.config.SingleRuleConfiguration;
import java.util.Collection;
@@ -49,11 +51,14 @@ public final class ClusterMetaDataManagerPersistService
implements MetaDataManag
@Override
public void createDatabase(final String databaseName) {
contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataService().addDatabase(databaseName);
+
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().persistDatabaseNameListenerAssisted(new
ListenerAssistedPOJO(databaseName, ListenerAssistedEnum.CREATE_DATABASE));
}
@Override
public void dropDatabase(final String databaseName) {
-
contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataService().dropDatabase(databaseName);
+
contextManager.getPersistServiceFacade().getListenerAssistedPersistService()
+ .persistDatabaseNameListenerAssisted(new
ListenerAssistedPOJO(contextManager.getDatabase(databaseName).getName(),
ListenerAssistedEnum.DROP_DATABASE));
+
contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataService().dropDatabase(contextManager.getDatabase(databaseName).getName());
}
@Override
diff --git
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index ef200deff89..510d523bfbd 100644
---
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -19,6 +19,6 @@
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.stora
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.GlobalRuleChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.PropertiesChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch.ListenerAssistedChangedWatcher
diff --git
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor
similarity index 50%
copy from
mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
copy to
mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor
index ef200deff89..f08ee5bc21e 100644
---
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.processor.ListenerAssistedProcessor
@@ -15,10 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher.QualifiedDataSourceChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.GlobalRuleChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.watcher.PropertiesChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor.CreateDatabaseListenerAssistedProcessor
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.processor.DropDatabaseListenerAssistedProcessor
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 8b3f8c65898..1d6f47c6d58 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -75,6 +75,10 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void watch(final String key, final DataChangedEventListener
listener) {
}
+ @Override
+ public void removeDataListener(final String key) {
+ }
+
@Override
public void close() {
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index 0df0ca5219d..94c518c1f0e 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -86,6 +86,10 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
public void watch(final String key, final DataChangedEventListener
listener) {
}
+ @Override
+ public void removeDataListener(final String key) {
+ }
+
@Override
public void close() {
REGISTRY_DATA.clear();
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
index 42dec7ec740..935feb4df42 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/CacheEvictedSubscriberTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.listener.DropDatabaseListenerAssistedEvent;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -33,7 +33,7 @@ class CacheEvictedSubscriberTest {
EventBusContext eventBusContext = new EventBusContext();
eventBusContext.register(new CacheEvictedSubscriber());
OrderedServicesCache.cacheServices(getClass(),
Collections.emptyList(), Collections.emptyMap());
- eventBusContext.post(new DatabaseDeletedEvent("db"));
+ eventBusContext.post(new DropDatabaseListenerAssistedEvent("db"));
assertFalse(OrderedServicesCache.findCachedServices(getClass(),
Collections.emptyList()).isPresent());
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
index b003d9e79ca..70683f00457 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
@@ -20,8 +20,6 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.datasource.pool.props.creator.DataSourcePoolPropertiesCreator;
-import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -33,13 +31,10 @@ import
org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -51,12 +46,9 @@ import org.mockito.quality.Strictness;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
@@ -102,29 +94,6 @@ class ResourceMetaDataChangedSubscriberTest {
return Collections.singletonMap("db", database);
}
- @Test
- void assertRenewForDatabaseAdded() {
-
when(persistService.getDataSourceUnitService().load("db_added")).thenReturn(createDataSourcePoolPropertiesMap());
-
when(persistService.getDatabaseRulePersistService().load("db_added")).thenReturn(Collections.emptyList());
- subscriber.renew(new DatabaseAddedEvent("db_added"));
-
assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db_added").getResourceMetaData().getStorageUnits());
- }
-
- private Map<String, DataSourcePoolProperties>
createDataSourcePoolPropertiesMap() {
- MockedDataSource dataSource = new MockedDataSource();
- Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(3,
1F);
- result.put("primary_ds",
DataSourcePoolPropertiesCreator.create(dataSource));
- result.put("replica_ds_0",
DataSourcePoolPropertiesCreator.create(dataSource));
- result.put("replica_ds_1",
DataSourcePoolPropertiesCreator.create(dataSource));
- return result;
- }
-
- @Test
- void assertRenewForDatabaseDeleted() {
- subscriber.renew(new DatabaseDeletedEvent("db"));
-
assertNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"));
- }
-
@Test
void assertRenewForSchemaAdded() {
subscriber.renew(new SchemaAddedEvent("db", "foo_schema"));
diff --git
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index 4a9e5fec657..aeabd5828a2 100644
---
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -58,4 +58,11 @@ public interface ClusterPersistRepository extends
PersistRepository {
* @param listener data changed event listener
*/
void watch(String key, DataChangedEventListener listener);
+
+ /**
+ * Remove listener by key.
+ *
+ * @param key key to be removed
+ */
+ void removeDataListener(String key);
}
diff --git
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 999457f7324..004f53e2123 100644
---
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -168,6 +168,11 @@ public final class EtcdRepository implements
ClusterPersistRepository {
WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(),
listener);
}
+ @Override
+ public void removeDataListener(final String key) {
+ // TODO
+ }
+
private Type getEventChangedType(final WatchEvent event) {
if (1 == event.getKeyValue().getVersion()) {
return Type.ADDED;
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 1cfae70b242..babaecc789b 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -50,6 +50,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -62,6 +63,8 @@ public final class ZookeeperRepository implements
ClusterPersistRepository {
private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
+ private final Map<String, CuratorCacheListener> dataListeners = new
ConcurrentHashMap<>();
+
private final Builder builder = CuratorFrameworkFactory.builder();
private CuratorFramework client;
@@ -236,6 +239,9 @@ public final class ZookeeperRepository implements
ClusterPersistRepository {
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
+ if (null != dataListeners.get(key)) {
+ return;
+ }
CuratorCache cache = caches.get(key);
if (null == cache) {
cache = CuratorCache.build(client, key);
@@ -253,6 +259,16 @@ public final class ZookeeperRepository implements
ClusterPersistRepository {
.build();
cache.listenable().addListener(curatorCacheListener);
cache.start();
+ dataListeners.computeIfAbsent(key, curator -> curatorCacheListener);
+ }
+
+ @Override
+ public void removeDataListener(final String key) {
+ CuratorCacheListener cacheListener = dataListeners.remove(key);
+ if (null == cacheListener) {
+ return;
+ }
+ Optional.ofNullable(caches.get(key)).ifPresent(optional ->
optional.listenable().removeListener(cacheListener));
}
@Override
diff --git
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index 935e5cbb657..3df9e6cbd75 100644
---
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -85,6 +85,10 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void watch(final String key, final DataChangedEventListener
listener) {
}
+ @Override
+ public void removeDataListener(final String key) {
+ }
+
@Override
public void close() {
REGISTRY_DATA.clear();