This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c1e8edbb5cc Use PersistServiceFacade instead of 
ShardingSphereSchemaDataAlteredEvent and remove 
ShardingSphereSchemaDataRegistrySubscriber (#31407)
c1e8edbb5cc is described below

commit c1e8edbb5cc8692eb1be588b429fd9b440e64eab
Author: Haoran Meng <[email protected]>
AuthorDate: Sun May 26 22:53:52 2024 +0800

    Use PersistServiceFacade instead of ShardingSphereSchemaDataAlteredEvent 
and remove ShardingSphereSchemaDataRegistrySubscriber (#31407)
    
    * Use PersistServiceFacade instead of ShardingSphereSchemaDataAlteredEvent 
and remove ShardingSphereSchemaDataRegistrySubscriber
    
    * Use PersistServiceFacade instead of ShardingSphereSchemaDataAlteredEvent 
and remove ShardingSphereSchemaDataRegistrySubscriber
---
 .../statistics/collect/StatisticsCollectJob.java   | 12 +++---
 .../collect/StatisticsCollectJobTest.java          |  4 +-
 .../mode/service/PersistServiceFacade.java         | 17 ++++++++
 .../pojo/ShardingSphereSchemaDataAlteredPOJO.java} |  7 ++-
 .../cluster/ClusterContextManagerBuilder.java      |  2 -
 ...ShardingSphereSchemaDataRegistrySubscriber.java | 50 ----------------------
 6 files changed, 29 insertions(+), 63 deletions(-)

diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
index effb25b6da1..b32f57030d5 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
@@ -35,7 +35,7 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSph
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
+import 
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
 
 import java.util.ArrayList;
 import java.util.Map;
@@ -134,13 +134,13 @@ public final class StatisticsCollectJob implements 
SimpleJob {
             return;
         }
         
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
 changedTableData);
-        ShardingSphereSchemaDataAlteredEvent event = 
getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData, 
changedTableData, table);
-        
contextManager.getComputeNodeInstanceContext().getEventBusContext().post(event);
+        ShardingSphereSchemaDataAlteredPOJO schemaDataAlteredPOJO = 
getShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, tableData, 
changedTableData, table);
+        
contextManager.getPersistServiceFacade().persist(schemaDataAlteredPOJO);
     }
     
-    private ShardingSphereSchemaDataAlteredEvent 
getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String 
schemaName, final ShardingSphereTableData tableData,
-                                                                               
          final ShardingSphereTableData changedTableData, final 
ShardingSphereTable table) {
-        ShardingSphereSchemaDataAlteredEvent result = new 
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, 
tableData.getName());
+    private ShardingSphereSchemaDataAlteredPOJO 
getShardingSphereSchemaDataAlteredPOJO(final String databaseName, final String 
schemaName, final ShardingSphereTableData tableData,
+                                                                               
        final ShardingSphereTableData changedTableData, final 
ShardingSphereTable table) {
+        ShardingSphereSchemaDataAlteredPOJO result = new 
ShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, 
tableData.getName());
         Map<String, ShardingSphereRowData> tableDataMap = 
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
         Map<String, ShardingSphereRowData> changedTableDataMap = 
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
         YamlShardingSphereRowDataSwapper swapper = new 
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
diff --git 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
 
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
index c4a9d026d32..5a89037ae27 100644
--- 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
+++ 
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
 import org.apache.shardingsphere.test.util.PropertiesBuilder;
 import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
 import org.junit.jupiter.api.Test;
@@ -40,6 +41,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Properties;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -58,7 +60,7 @@ class StatisticsCollectJobTest {
         
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
 TemporaryConfigurationProperties(
                 PropertiesBuilder.build(new 
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
 Boolean.TRUE.toString()))));
         new StatisticsCollectJob(contextManager).execute(null);
-        verify(contextManager).getComputeNodeInstanceContext();
+        
verify(contextManager.getPersistServiceFacade()).persist(any(ShardingSphereSchemaDataAlteredPOJO.class));
     }
     
     private ShardingSphereStatistics mockStatistics() {
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
index 1f06cccb0e7..86153b94280 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 import org.apache.shardingsphere.mode.state.StatePersistService;
 
@@ -45,4 +46,20 @@ public final class PersistServiceFacade {
         statePersistService = new StatePersistService(repository);
         metaDataManagerPersistService = 
TypedSPILoader.getService(MetaDataManagerPersistServiceBuilder.class, 
modeConfiguration.getType()).build(contextManager);
     }
+    
+    /**
+     * Update when sharding sphere schema data altered.
+     *
+     * @param schemaDataAlteredPOJO sharding sphere schema data
+     */
+    public void persist(final ShardingSphereSchemaDataAlteredPOJO 
schemaDataAlteredPOJO) {
+        String databaseName = schemaDataAlteredPOJO.getDatabaseName();
+        String schemaName = schemaDataAlteredPOJO.getSchemaName();
+        
metaDataPersistService.getShardingSphereDataPersistService().getTableRowDataPersistService().persist(databaseName,
 schemaName, schemaDataAlteredPOJO.getTableName(),
+                schemaDataAlteredPOJO.getAddedRows());
+        
metaDataPersistService.getShardingSphereDataPersistService().getTableRowDataPersistService().persist(databaseName,
 schemaName, schemaDataAlteredPOJO.getTableName(),
+                schemaDataAlteredPOJO.getUpdatedRows());
+        
metaDataPersistService.getShardingSphereDataPersistService().getTableRowDataPersistService().delete(databaseName,
 schemaName, schemaDataAlteredPOJO.getTableName(),
+                schemaDataAlteredPOJO.getDeletedRows());
+    }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereSchemaDataAlteredEvent.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ShardingSphereSchemaDataAlteredPOJO.java
similarity index 84%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereSchemaDataAlteredEvent.java
rename to 
mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ShardingSphereSchemaDataAlteredPOJO.java
index aa9ec928581..1762beda0a7 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereSchemaDataAlteredEvent.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ShardingSphereSchemaDataAlteredPOJO.java
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
+package org.apache.shardingsphere.mode.service.pojo;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
 import 
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 
 import java.util.Collection;
 import java.util.LinkedList;
 
 /**
- * Schema altered event.
+ * Schema altered pojo.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ShardingSphereSchemaDataAlteredEvent implements 
GovernanceEvent {
+public final class ShardingSphereSchemaDataAlteredPOJO {
     
     private final String databaseName;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index a4d13f555ce..72f4874996c 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
@@ -99,7 +98,6 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     private void createSubscribers(final EventBusContext eventBusContext, 
final ClusterPersistRepository repository) {
         eventBusContext.register(new 
QualifiedDataSourceStatusSubscriber(repository));
         eventBusContext.register(new ClusterProcessSubscriber(repository, 
eventBusContext));
-        eventBusContext.register(new 
ShardingSphereSchemaDataRegistrySubscriber(repository));
     }
     
     private void registerOnline(final EventBusContext eventBusContext, final 
ComputeNodeInstanceContext computeNodeInstanceContext,
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
deleted file mode 100644
index c7843a645ed..00000000000
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-/**
- * ShardingSphere schema data registry subscriber.
- */
-public final class ShardingSphereSchemaDataRegistrySubscriber implements 
EventSubscriber {
-    
-    private final ShardingSphereDataPersistService persistService;
-    
-    public ShardingSphereSchemaDataRegistrySubscriber(final 
ClusterPersistRepository repository) {
-        persistService = new ShardingSphereDataPersistService(repository);
-    }
-    
-    /**
-     * Update when ShardingSphere schema data altered.
-     *
-     * @param event schema altered event
-     */
-    @Subscribe
-    public void update(final ShardingSphereSchemaDataAlteredEvent event) {
-        String databaseName = event.getDatabaseName();
-        String schemaName = event.getSchemaName();
-        persistService.getTableRowDataPersistService().persist(databaseName, 
schemaName, event.getTableName(), event.getAddedRows());
-        persistService.getTableRowDataPersistService().persist(databaseName, 
schemaName, event.getTableName(), event.getUpdatedRows());
-        persistService.getTableRowDataPersistService().delete(databaseName, 
schemaName, event.getTableName(), event.getDeletedRows());
-    }
-}

Reply via email to