This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e5723894331 Refactor statistics collect job (#31989)
e5723894331 is described below

commit e57238943314697e28f80087a261fd030aa35472
Author: jiangML <[email protected]>
AuthorDate: Fri Jul 5 19:28:02 2024 +0800

    Refactor statistics collect job (#31989)
---
 .../BroadcastInstanceBroadcastRoutingEngine.java   |   2 +-
 .../shardingsphere/infra/lock/GlobalLockNames.java |   4 +-
 .../statistics/collect/StatisticsCollectJob.java   | 130 ++-------------------
 .../collect/StatisticsCollectJobWorker.java        |  24 +++-
 .../ShardingSphereStatisticsRefreshEngine.java     |  90 +++++++++-----
 .../mode/fixture}/StatisticsCollectorFixture.java  |   2 +-
 .../ShardingSphereStatisticsRefreshEngineTest.java |  13 ++-
 ...ics.collector.ShardingSphereStatisticsCollector |   2 +-
 .../dispatch/ListenerAssistedSubscriber.java       |  13 +++
 .../ResourceMetaDataChangedSubscriber.java         |  18 +++
 10 files changed, 136 insertions(+), 162 deletions(-)

diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
index ef3768deb64..c88487679da 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
@@ -31,7 +31,7 @@ import java.util.Collections;
  * Broadcast routing engine for database instance.
  */
 @RequiredArgsConstructor
-public class BroadcastInstanceBroadcastRoutingEngine implements 
BroadcastRouteEngine {
+public final class BroadcastInstanceBroadcastRoutingEngine implements 
BroadcastRouteEngine {
     
     private final ResourceMetaData resourceMetaData;
     
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
index 927c98f6d84..24e7da36050 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
@@ -28,7 +28,9 @@ public enum GlobalLockNames {
     
     PREPARE("prepare_%s"),
     
-    GLOBAL_LOCK("global_clock");
+    GLOBAL_LOCK("global_clock"),
+    
+    STATISTICS("statistics");
     
     private final String lockName;
 }
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
index d272f55041c..4526debbb91 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
@@ -21,28 +21,12 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import 
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
+import 
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 /**
  * Statistics collect job.
@@ -55,107 +39,9 @@ public final class StatisticsCollectJob implements 
SimpleJob {
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        try {
-            if 
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
 {
-                ShardingSphereStatistics statistics = 
contextManager.getMetaDataContexts().getStatistics();
-                ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
-                ShardingSphereStatistics changedStatistics = new 
ShardingSphereStatistics();
-                statistics.getDatabaseData().forEach((key, value) -> {
-                    if (metaData.containsDatabase(key)) {
-                        collectForDatabase(key, value, 
metaData.getDatabases(), changedStatistics);
-                    }
-                });
-                compareUpdateAndSendEvent(statistics, changedStatistics, 
metaData.getDatabases());
-            }
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("Collect data error", ex);
-        }
-    }
-    
-    private void collectForDatabase(final String databaseName, final 
ShardingSphereDatabaseData databaseData,
-                                    final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
-        databaseData.getSchemaData().forEach((key, value) -> {
-            if (databases.get(databaseName.toLowerCase()).containsSchema(key)) 
{
-                collectForSchema(databaseName, key, value, databases, 
statistics);
-            }
-        });
-    }
-    
-    private void collectForSchema(final String databaseName, final String 
schemaName, final ShardingSphereSchemaData schemaData,
-                                  final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
-        schemaData.getTableData().forEach((key, value) -> {
-            if 
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
 {
-                collectForTable(databaseName, schemaName, 
databases.get(databaseName).getSchema(schemaName).getTable(key), databases, 
statistics);
-            }
-        });
-    }
-    
-    private void collectForTable(final String databaseName, final String 
schemaName, final ShardingSphereTable table,
-                                 final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
-        Optional<ShardingSphereStatisticsCollector> dataCollector = 
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class, 
table.getName());
-        if (!dataCollector.isPresent()) {
-            return;
-        }
-        Optional<ShardingSphereTableData> tableData = Optional.empty();
-        try {
-            tableData = dataCollector.get().collect(databaseName, table, 
databases, 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error(String.format("Collect %s.%s.%s data failed", 
databaseName, schemaName, table.getName()), ex);
-        }
-        tableData.ifPresent(optional -> 
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> 
new ShardingSphereDatabaseData())
-                .getSchemaData().computeIfAbsent(schemaName, key -> new 
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), 
optional));
-    }
-    
-    private void compareUpdateAndSendEvent(final ShardingSphereStatistics 
statistics, final ShardingSphereStatistics changedStatistics,
-                                           final Map<String, 
ShardingSphereDatabase> databases) {
-        changedStatistics.getDatabaseData().forEach((key, value) -> 
compareUpdateAndSendEventForDatabase(key, 
statistics.getDatabaseData().get(key), value, statistics,
-                databases.get(key.toLowerCase())));
-    }
-    
-    private void compareUpdateAndSendEventForDatabase(final String 
databaseName, final ShardingSphereDatabaseData databaseData, final 
ShardingSphereDatabaseData changedDatabaseData,
-                                                      final 
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
-        changedDatabaseData.getSchemaData().forEach((key, value) -> 
compareUpdateAndSendEventForSchema(databaseName, key, 
databaseData.getSchemaData().get(key), value, statistics,
-                database.getSchema(key)));
-    }
-    
-    private void compareUpdateAndSendEventForSchema(final String databaseName, 
final String schemaName, final ShardingSphereSchemaData schemaData,
-                                                    final 
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics 
statistics, final ShardingSphereSchema schema) {
-        changedSchemaData.getTableData().forEach((key, value) -> 
compareUpdateAndSendEventForTable(databaseName, schemaName, 
schemaData.getTableData().get(key), value, statistics,
-                schema.getTable(key)));
-    }
-    
-    private void compareUpdateAndSendEventForTable(final String databaseName, 
final String schemaName, final ShardingSphereTableData tableData,
-                                                   final 
ShardingSphereTableData changedTableData, final ShardingSphereStatistics 
statistics, final ShardingSphereTable table) {
-        if (tableData.equals(changedTableData)) {
-            return;
-        }
-        
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
 changedTableData);
-        AlteredShardingSphereSchemaData schemaDataAlteredPOJO = 
getShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, tableData, 
changedTableData, table);
-        
contextManager.getPersistServiceFacade().persist(schemaDataAlteredPOJO);
-    }
-    
-    private AlteredShardingSphereSchemaData 
getShardingSphereSchemaDataAlteredPOJO(final String databaseName, final String 
schemaName, final ShardingSphereTableData tableData,
-                                                                               
    final ShardingSphereTableData changedTableData, final ShardingSphereTable 
table) {
-        AlteredShardingSphereSchemaData result = new 
AlteredShardingSphereSchemaData(databaseName, schemaName, tableData.getName());
-        Map<String, ShardingSphereRowData> tableDataMap = 
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
-        Map<String, ShardingSphereRowData> changedTableDataMap = 
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
-        YamlShardingSphereRowDataSwapper swapper = new 
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
-        for (Entry<String, ShardingSphereRowData> entry : 
changedTableDataMap.entrySet()) {
-            if (!tableDataMap.containsKey(entry.getKey())) {
-                
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-            } else if 
(!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
-                
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-            }
-        }
-        for (Entry<String, ShardingSphereRowData> entry : 
tableDataMap.entrySet()) {
-            if (!changedTableDataMap.containsKey(entry.getKey())) {
-                
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-            }
+        PersistRepository repository = 
contextManager.getPersistServiceFacade().getRepository();
+        if (repository instanceof ClusterPersistRepository) {
+            new ShardingSphereStatisticsRefreshEngine(contextManager, new 
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository) 
repository))).refresh();
         }
-        return result;
     }
 }
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
index ef12120d4e7..0c8448d4282 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -69,11 +70,32 @@ public final class StatisticsCollectJobWorker {
     private static CoordinatorRegistryCenter createRegistryCenter(final 
ModeConfiguration modeConfig) {
         ClusterPersistRepositoryConfiguration repositoryConfig = 
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
         String namespace = String.join("/", repositoryConfig.getNamespace(), 
ShardingSphereDataNode.getJobPath());
-        CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(new 
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace));
+        CoordinatorRegistryCenter result = new 
ZookeeperRegistryCenter(getZookeeperConfiguration(repositoryConfig, namespace));
         result.init();
         return result;
     }
     
+    private static ZookeeperConfiguration getZookeeperConfiguration(final 
ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace) 
{
+        // TODO Merge registry center code in ElasticJob and ShardingSphere 
mode; Use SPI to load impl
+        ZookeeperConfiguration result = new 
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace);
+        Properties props = repositoryConfig.getProps();
+        int retryIntervalMilliseconds = 
props.containsKey("retryIntervalMilliseconds") ? (int) 
props.get("retryIntervalMilliseconds") : 500;
+        int maxRetries = props.containsKey("maxRetries") ? (int) 
props.get("maxRetries") : 3;
+        result.setBaseSleepTimeMilliseconds(retryIntervalMilliseconds);
+        result.setMaxRetries(maxRetries);
+        result.setMaxSleepTimeMilliseconds(retryIntervalMilliseconds * 
maxRetries);
+        int timeToLiveSeconds = props.containsKey("timeToLiveSeconds") ? (int) 
props.get("timeToLiveSeconds") : 60;
+        if (0 != timeToLiveSeconds) {
+            result.setSessionTimeoutMilliseconds(timeToLiveSeconds * 1000);
+        }
+        int operationTimeoutMilliseconds = 
props.containsKey("operationTimeoutMilliseconds") ? (int) 
props.get("operationTimeoutMilliseconds") : 500;
+        if (0 != operationTimeoutMilliseconds) {
+            
result.setConnectionTimeoutMilliseconds(operationTimeoutMilliseconds);
+        }
+        result.setDigest(props.getProperty("digest"));
+        return result;
+    }
+    
     private static JobConfiguration createJobConfiguration() {
         return JobConfiguration.newBuilder(JOB_NAME, 
1).cron(CRON_EXPRESSION).overwrite(true).build();
     }
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
similarity index 73%
copy from 
kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
copy to 
mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
index d272f55041c..0e853cfab52 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+package org.apache.shardingsphere.mode.metadata.refresher;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.lock.GlobalLockNames;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -34,6 +34,8 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableDa
 import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
+import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;
 
@@ -41,55 +43,81 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * Statistics collect job.
+ * ShardingSphere statistics refresh engine.
  */
 @RequiredArgsConstructor
 @Slf4j
-public final class StatisticsCollectJob implements SimpleJob {
+public final class ShardingSphereStatisticsRefreshEngine {
+    
+    private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("statistics-collect-%d"));
     
     private final ContextManager contextManager;
     
-    @Override
-    public void execute(final ShardingContext shardingContext) {
+    private final GlobalLockContext globalLockContext;
+    
+    /**
+     * Async refresh.
+     */
+    public void asyncRefresh() {
+        EXECUTOR_SERVICE.execute(this::refresh);
+    }
+    
+    /**
+     * Refresh.
+     */
+    public void refresh() {
         try {
             if 
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
 {
+                collectAndRefresh();
+            }
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("Collect data error", ex);
+        }
+    }
+    
+    private void collectAndRefresh() {
+        GlobalLockDefinition lockDefinition = new 
GlobalLockDefinition(GlobalLockNames.STATISTICS.getLockName());
+        if (globalLockContext.tryLock(lockDefinition, 5000L)) {
+            try {
                 ShardingSphereStatistics statistics = 
contextManager.getMetaDataContexts().getStatistics();
                 ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
                 ShardingSphereStatistics changedStatistics = new 
ShardingSphereStatistics();
-                statistics.getDatabaseData().forEach((key, value) -> {
-                    if (metaData.containsDatabase(key)) {
-                        collectForDatabase(key, value, 
metaData.getDatabases(), changedStatistics);
+                for (Entry<String, ShardingSphereDatabaseData> entry : 
statistics.getDatabaseData().entrySet()) {
+                    if (metaData.containsDatabase(entry.getKey())) {
+                        collectForDatabase(entry.getKey(), entry.getValue(), 
metaData.getDatabases(), changedStatistics);
                     }
-                });
+                }
                 compareUpdateAndSendEvent(statistics, changedStatistics, 
metaData.getDatabases());
+            } finally {
+                globalLockContext.unlock(lockDefinition);
             }
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("Collect data error", ex);
         }
     }
     
     private void collectForDatabase(final String databaseName, final 
ShardingSphereDatabaseData databaseData,
                                     final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
-        databaseData.getSchemaData().forEach((key, value) -> {
-            if (databases.get(databaseName.toLowerCase()).containsSchema(key)) 
{
-                collectForSchema(databaseName, key, value, databases, 
statistics);
+        for (Entry<String, ShardingSphereSchemaData> entry : 
databaseData.getSchemaData().entrySet()) {
+            if 
(databases.get(databaseName.toLowerCase()).containsSchema(entry.getKey())) {
+                collectForSchema(databaseName, entry.getKey(), 
entry.getValue(), databases, statistics);
             }
-        });
+        }
     }
     
     private void collectForSchema(final String databaseName, final String 
schemaName, final ShardingSphereSchemaData schemaData,
                                   final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
-        schemaData.getTableData().forEach((key, value) -> {
-            if 
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
 {
-                collectForTable(databaseName, schemaName, 
databases.get(databaseName).getSchema(schemaName).getTable(key), databases, 
statistics);
+        for (Entry<String, ShardingSphereTableData> entry : 
schemaData.getTableData().entrySet()) {
+            if 
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(entry.getKey()))
 {
+                collectForTable(databaseName, schemaName, 
databases.get(databaseName).getSchema(schemaName).getTable(entry.getKey()), 
databases, statistics);
             }
-        });
+        }
     }
     
     private void collectForTable(final String databaseName, final String 
schemaName, final ShardingSphereTable table,
@@ -106,26 +134,26 @@ public final class StatisticsCollectJob implements 
SimpleJob {
             // CHECKSTYLE:ON
             log.error(String.format("Collect %s.%s.%s data failed", 
databaseName, schemaName, table.getName()), ex);
         }
-        tableData.ifPresent(optional -> 
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> 
new ShardingSphereDatabaseData())
-                .getSchemaData().computeIfAbsent(schemaName, key -> new 
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), 
optional));
+        tableData.ifPresent(optional -> 
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> 
new ShardingSphereDatabaseData()).getSchemaData()
+                .computeIfAbsent(schemaName, key -> new 
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), 
optional));
     }
     
     private void compareUpdateAndSendEvent(final ShardingSphereStatistics 
statistics, final ShardingSphereStatistics changedStatistics,
                                            final Map<String, 
ShardingSphereDatabase> databases) {
-        changedStatistics.getDatabaseData().forEach((key, value) -> 
compareUpdateAndSendEventForDatabase(key, 
statistics.getDatabaseData().get(key), value, statistics,
-                databases.get(key.toLowerCase())));
+        changedStatistics.getDatabaseData().forEach(
+                (key, value) -> compareUpdateAndSendEventForDatabase(key, 
statistics.getDatabaseData().get(key), value, statistics, 
databases.get(key.toLowerCase())));
     }
     
     private void compareUpdateAndSendEventForDatabase(final String 
databaseName, final ShardingSphereDatabaseData databaseData, final 
ShardingSphereDatabaseData changedDatabaseData,
                                                       final 
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
-        changedDatabaseData.getSchemaData().forEach((key, value) -> 
compareUpdateAndSendEventForSchema(databaseName, key, 
databaseData.getSchemaData().get(key), value, statistics,
-                database.getSchema(key)));
+        changedDatabaseData.getSchemaData().forEach(
+                (key, value) -> 
compareUpdateAndSendEventForSchema(databaseName, key, 
databaseData.getSchemaData().get(key), value, statistics, 
database.getSchema(key)));
     }
     
     private void compareUpdateAndSendEventForSchema(final String databaseName, 
final String schemaName, final ShardingSphereSchemaData schemaData,
                                                     final 
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics 
statistics, final ShardingSphereSchema schema) {
-        changedSchemaData.getTableData().forEach((key, value) -> 
compareUpdateAndSendEventForTable(databaseName, schemaName, 
schemaData.getTableData().get(key), value, statistics,
-                schema.getTable(key)));
+        changedSchemaData.getTableData().forEach(
+                (key, value) -> 
compareUpdateAndSendEventForTable(databaseName, schemaName, 
schemaData.getTableData().get(key), value, statistics, schema.getTable(key)));
     }
     
     private void compareUpdateAndSendEventForTable(final String databaseName, 
final String schemaName, final ShardingSphereTableData tableData,
diff --git 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/StatisticsCollectorFixture.java
similarity index 96%
rename from 
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
rename to 
mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/StatisticsCollectorFixture.java
index d2c81f0b2d8..83257c6a5dc 100644
--- 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/StatisticsCollectorFixture.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+package org.apache.shardingsphere.mode.fixture;
 
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
diff --git 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngineTest.java
similarity index 90%
rename from 
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
rename to 
mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngineTest.java
index c80faebce8d..c0dd1af7bad 100644
--- 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngineTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+package org.apache.shardingsphere.mode.metadata.refresher;
 
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationProperties;
@@ -29,6 +29,8 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabas
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
+import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;
 import org.apache.shardingsphere.test.util.PropertiesBuilder;
@@ -42,15 +44,16 @@ import java.util.LinkedList;
 import java.util.Properties;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-class StatisticsCollectJobTest {
+class ShardingSphereStatisticsRefreshEngineTest {
     
     @Test
-    void assertCollect() {
+    void assertRefresh() {
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         ShardingSphereStatistics statistics = mockStatistics();
         
when(contextManager.getMetaDataContexts().getStatistics()).thenReturn(statistics);
@@ -59,7 +62,9 @@ class StatisticsCollectJobTest {
         
when(contextManager.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
 ConfigurationProperties(new Properties()));
         
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
 TemporaryConfigurationProperties(
                 PropertiesBuilder.build(new 
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
 Boolean.TRUE.toString()))));
-        new StatisticsCollectJob(contextManager).execute(null);
+        GlobalLockContext globalLockContext = mock(GlobalLockContext.class);
+        when(globalLockContext.tryLock(any(GlobalLockDefinition.class), 
anyLong())).thenReturn(true);
+        new ShardingSphereStatisticsRefreshEngine(contextManager, 
globalLockContext).refresh();
         
verify(contextManager.getPersistServiceFacade()).persist(any(AlteredShardingSphereSchemaData.class));
     }
     
diff --git 
a/kernel/schedule/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
 
b/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
similarity index 89%
rename from 
kernel/schedule/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
rename to 
mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
index 190b550cc3a..1c364ce3e81 100644
--- 
a/kernel/schedule/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
+++ 
b/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.schedule.core.job.statistics.collect.StatisticsCollectorFixture
+org.apache.shardingsphere.mode.fixture.StatisticsCollectorFixture
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index df7ef650b93..59d4be84fba 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -23,10 +23,14 @@ import 
org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import 
org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.listener.DataChangedEventListenerManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.listener.MetaDataChangedListener;
+import 
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
+import 
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 /**
  * Listener assisted subscriber.
@@ -54,6 +58,7 @@ public final class ListenerAssistedSubscriber implements 
EventSubscriber {
                 new 
MetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addDatabase(event.getDatabaseName());
         
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+        refreshShardingSphereStatisticsData();
     }
     
     /**
@@ -66,5 +71,13 @@ public final class ListenerAssistedSubscriber implements 
EventSubscriber {
         
listenerManager.removeListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()));
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropDatabase(event.getDatabaseName());
         
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+        refreshShardingSphereStatisticsData();
+    }
+    
+    private void refreshShardingSphereStatisticsData() {
+        PersistRepository repository = 
contextManager.getPersistServiceFacade().getRepository();
+        if (repository instanceof ClusterPersistRepository) {
+            new ShardingSphereStatisticsRefreshEngine(contextManager, new 
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository) 
repository))).asyncRefresh();
+        }
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
index 17c92504be8..4516026b532 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
@@ -26,9 +26,14 @@ import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.Creat
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.DropTableEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.CreateOrAlterViewEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.DropViewEvent;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
+import 
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 import java.util.Map;
 
@@ -49,6 +54,7 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
     @Subscribe
     public synchronized void renew(final SchemaAddedEvent event) {
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addSchema(event.getDatabaseName(),
 event.getSchemaName());
+        refreshShardingSphereStatisticsData();
     }
     
     /**
@@ -59,6 +65,7 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
     @Subscribe
     public synchronized void renew(final SchemaDeletedEvent event) {
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropSchema(event.getDatabaseName(),
 event.getSchemaName());
+        refreshShardingSphereStatisticsData();
     }
     
     /**
@@ -76,6 +83,7 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
                 
.getTableMetaDataPersistService().load(event.getDatabaseName(), 
event.getSchemaName(), event.getTableName());
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
 event.getSchemaName(),
                 tables.values().iterator().next(), null);
+        refreshShardingSphereStatisticsData();
     }
     
     /**
@@ -86,6 +94,7 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
     @Subscribe
     public synchronized void renew(final DropTableEvent event) {
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
 event.getSchemaName(), event.getTableName(), null);
+        refreshShardingSphereStatisticsData();
     }
     
     /**
@@ -103,6 +112,7 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
                 .getViewMetaDataPersistService().load(event.getDatabaseName(), 
event.getSchemaName(), event.getViewName());
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
 event.getSchemaName(),
                 null, views.values().iterator().next());
+        refreshShardingSphereStatisticsData();
     }
     
     /**
@@ -113,5 +123,13 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
     @Subscribe
     public synchronized void renew(final DropViewEvent event) {
         
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
 event.getSchemaName(), null, event.getViewName());
+        refreshShardingSphereStatisticsData();
+    }
+    
+    private void refreshShardingSphereStatisticsData() {
+        PersistRepository repository = 
contextManager.getPersistServiceFacade().getRepository();
+        if (repository instanceof ClusterPersistRepository) {
+            new ShardingSphereStatisticsRefreshEngine(contextManager, new 
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository) 
repository))).asyncRefresh();
+        }
     }
 }


Reply via email to