This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd25eebd69 Refactor meta data collect (#30727)
cdd25eebd69 is described below

commit cdd25eebd69fda66769efa3a1d72ff01d504b215
Author: jiangML <[email protected]>
AuthorDate: Tue Apr 2 14:59:18 2024 +0800

    Refactor meta data collect (#30727)
    
    * Refactor meta data collect
    
    * Refactor ShardingSphereSchemaDataRegistrySubscriber
    
    * Add comment for StatisticsCollectJob class
    
    * Fix checkstyle error
    
    * Update proxy-meta-data-collector-enabled variable
    
    * Refactor ShardingSphereStatisticsBuilder
    
    * Optimize PostgreSQLShardingSphereStatisticsBuilder
    
    * Optimize MySQLShardingSphereStatisticsBuilder
    
    * Fix test error
    
    * Fix e2e test error
---
 .../data/ShardingStatisticsTableCollector.java     |   4 +-
 .../TemporaryConfigurationPropertyKey.java         |   2 +-
 .../shardingsphere/infra/lock/GlobalLockNames.java |   2 -
 .../MySQLShardingSphereStatisticsBuilder.java      |  50 +++---
 .../PostgreSQLShardingSphereStatisticsBuilder.java |  62 ++++----
 .../ShardingSphereStatisticsCollector.java         |   4 +-
 .../collector/tables/PgClassTableCollector.java    |   3 +-
 .../tables/PgNamespaceTableCollector.java          |   4 +-
 ...tgreSQLShardingSphereStatisticsBuilderTest.java |   4 +-
 .../persist/node/ShardingSphereDataNode.java       |  15 +-
 .../persist/node/ShardingSphereDataNodeTest.java   |  42 +++---
 ...ticsCollectContextManagerLifecycleListener.java |   8 +-
 .../statistics/collect/StatisticsCollectJob.java   | 161 ++++++++++++++++++++
 .../collect/StatisticsCollectJobWorker.java        |  41 ++++-
 .../collect/StatisticsCollectScheduler.java        | 168 ---------------------
 ...ulerTest.java => StatisticsCollectJobTest.java} |  11 +-
 .../collect/StatisticsCollectorFixture.java        |   4 +-
 .../cluster/coordinator/RegistryCenter.java        |   2 +-
 ...ShardingSphereSchemaDataRegistrySubscriber.java |  21 +--
 .../e2e/cases/assertion/IntegrationTestCase.java   |   3 +
 .../test/e2e/engine/type/dql/GeneralDQLE2EIT.java  |   9 ++
 .../cases/dql/dql-integration-select-order-by.xml  |   2 +-
 22 files changed, 341 insertions(+), 281 deletions(-)

diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index 802f0129b20..73a62d5bb33 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
@@ -48,7 +49,8 @@ public final class ShardingStatisticsTableCollector 
implements ShardingSphereSta
     private static final String SHARDING_TABLE_STATISTICS = 
"sharding_table_statistics";
     
     @Override
-    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases) throws SQLException {
+    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases,
+                                                     final RuleMetaData 
globalRuleMetaData) throws SQLException {
         ShardingSphereTableData result = new 
ShardingSphereTableData(SHARDING_TABLE_STATISTICS);
         DatabaseType protocolType = 
databases.values().iterator().next().getProtocolType();
         DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(protocolType).getDialectDatabaseMetaData();
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
index b8e17608a33..fdd63ddaf33 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
@@ -35,7 +35,7 @@ public enum TemporaryConfigurationPropertyKey implements 
TypedPropertyKey {
     /**
      * Proxy meta data collector enabled.
      */
-    PROXY_META_DATA_COLLECTOR_ENABLED("proxy-meta-data-collector-enabled", 
String.valueOf(Boolean.FALSE), boolean.class, true),
+    PROXY_META_DATA_COLLECTOR_ENABLED("proxy-meta-data-collector-enabled", 
String.valueOf(Boolean.FALSE), boolean.class, false),
     
     /**
      * System schema metadata enabled.
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
index 356bdacbc31..927c98f6d84 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
@@ -24,8 +24,6 @@ import lombok.RequiredArgsConstructor;
 @Getter
 public enum GlobalLockNames {
     
-    STATISTICS("statistics_%s_%s_%s"),
-    
     CLUSTER_LOCK("cluster_lock"),
     
     PREPARE("prepare_%s"),
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
index 76265484569..51c9ebb256c 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
@@ -19,16 +19,15 @@ package 
org.apache.shardingsphere.infra.metadata.statistics.builder.dialect;
 
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsBuilder;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 
-import java.util.Optional;
 import java.util.Collections;
 import java.util.Map.Entry;
 
@@ -42,27 +41,42 @@ public final class MySQLShardingSphereStatisticsBuilder 
implements ShardingSpher
     
     private static final String CLUSTER_INFORMATION = "cluster_information";
     
+    private static final String SHARDING_TABLE_STATISTICS = 
"sharding_table_statistics";
+    
     @Override
     public ShardingSphereStatistics build(final ShardingSphereMetaData 
metaData) {
         ShardingSphereStatistics result = new ShardingSphereStatistics();
-        Optional<ShardingSphereSchema> shardingSphereSchema = 
Optional.ofNullable(metaData.getDatabase(SHARDING_SPHERE)).map(database -> 
database.getSchema(SHARDING_SPHERE));
-        if (!shardingSphereSchema.isPresent()) {
-            return result;
-        }
-        ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
-        for (Entry<String, ShardingSphereTable> entry : 
shardingSphereSchema.get().getTables().entrySet()) {
-            ShardingSphereTableData tableData = new 
ShardingSphereTableData(entry.getValue().getName());
-            if (CLUSTER_INFORMATION.equals(entry.getKey())) {
-                tableData.getRows().add(new 
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
+        for (Entry<String, ShardingSphereDatabase> entry : 
metaData.getDatabases().entrySet()) {
+            ShardingSphereDatabaseData databaseData = new 
ShardingSphereDatabaseData();
+            initSchemas(entry.getValue(), databaseData);
+            if (!databaseData.getSchemaData().isEmpty()) {
+                result.putDatabase(entry.getKey(), databaseData);
             }
-            schemaData.getTableData().put(entry.getKey(), tableData);
         }
-        ShardingSphereDatabaseData databaseData = new 
ShardingSphereDatabaseData();
-        databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
-        result.getDatabaseData().put(SHARDING_SPHERE, databaseData);
         return result;
     }
     
+    private void initSchemas(final ShardingSphereDatabase database, final 
ShardingSphereDatabaseData databaseData) {
+        for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
+            if (SHARDING_SPHERE.equals(entry.getKey())) {
+                ShardingSphereSchemaData schemaData = new 
ShardingSphereSchemaData();
+                initClusterInformationTable(schemaData);
+                initShardingTableStatisticsTable(schemaData);
+                databaseData.putSchema(SHARDING_SPHERE, schemaData);
+            }
+        }
+    }
+    
+    private void initClusterInformationTable(final ShardingSphereSchemaData 
schemaData) {
+        ShardingSphereTableData tableData = new 
ShardingSphereTableData(CLUSTER_INFORMATION);
+        tableData.getRows().add(new 
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
+        schemaData.putTable(CLUSTER_INFORMATION, tableData);
+    }
+    
+    private void initShardingTableStatisticsTable(final 
ShardingSphereSchemaData schemaData) {
+        schemaData.putTable(SHARDING_TABLE_STATISTICS, new 
ShardingSphereTableData(SHARDING_TABLE_STATISTICS));
+    }
+    
     @Override
     public String getDatabaseType() {
         return "MySQL";
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
index a4a38d3c0fd..94085419010 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
@@ -18,8 +18,6 @@
 package org.apache.shardingsphere.infra.metadata.statistics.builder.dialect;
 
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
-import 
org.apache.shardingsphere.infra.database.core.metadata.database.system.SystemDatabase;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -30,7 +28,6 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsBuilder;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,51 +42,62 @@ import java.util.Map.Entry;
 
 public final class PostgreSQLShardingSphereStatisticsBuilder implements 
ShardingSphereStatisticsBuilder {
     
-    private static final Map<String, Collection<String>> 
COLLECTED_SCHEMA_TABLES = new LinkedHashMap<>();
+    private static final String SHARDING_SPHERE = "shardingsphere";
     
-    private static final Map<String, Collection<String>> 
INIT_DATA_SCHEMA_TABLES = new LinkedHashMap<>();
+    private static final String CLUSTER_INFORMATION = "cluster_information";
+    
+    private static final String SHARDING_TABLE_STATISTICS = 
"sharding_table_statistics";
     
-    private final SystemDatabase systemDatabase = new 
SystemDatabase(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"));
+    private static final Map<String, Collection<String>> 
INIT_DATA_SCHEMA_TABLES = new LinkedHashMap<>();
     
     static {
-        COLLECTED_SCHEMA_TABLES.put("shardingsphere", 
Collections.singletonList("sharding_table_statistics"));
-        COLLECTED_SCHEMA_TABLES.put("pg_catalog", Arrays.asList("pg_class", 
"pg_namespace"));
-        INIT_DATA_SCHEMA_TABLES.put("shardingsphere", 
Collections.singletonList("cluster_information"));
+        INIT_DATA_SCHEMA_TABLES.put("pg_catalog", Arrays.asList("pg_class", 
"pg_namespace"));
     }
     
     @Override
     public ShardingSphereStatistics build(final ShardingSphereMetaData 
metaData) {
         ShardingSphereStatistics result = new ShardingSphereStatistics();
         for (Entry<String, ShardingSphereDatabase> entry : 
metaData.getDatabases().entrySet()) {
-            if 
(systemDatabase.getSystemDatabaseSchemaMap().containsKey(entry.getKey())) {
-                continue;
-            }
             ShardingSphereDatabaseData databaseData = new 
ShardingSphereDatabaseData();
-            appendSchemaData(entry.getValue(), databaseData);
-            result.getDatabaseData().put(entry.getKey(), databaseData);
+            initSchemas(entry.getValue(), databaseData);
+            if (!databaseData.getSchemaData().isEmpty()) {
+                result.putDatabase(entry.getKey(), databaseData);
+            }
         }
         return result;
     }
     
-    private void appendSchemaData(final ShardingSphereDatabase database, final 
ShardingSphereDatabaseData databaseData) {
+    private void initSchemas(final ShardingSphereDatabase database, final 
ShardingSphereDatabaseData databaseData) {
         for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
-            if (COLLECTED_SCHEMA_TABLES.containsKey(entry.getKey()) || 
INIT_DATA_SCHEMA_TABLES.containsKey(entry.getKey())) {
+            if (SHARDING_SPHERE.equals(entry.getKey())) {
+                ShardingSphereSchemaData schemaData = new 
ShardingSphereSchemaData();
+                initClusterInformationTable(schemaData);
+                initShardingTableStatisticsTable(schemaData);
+                databaseData.putSchema(SHARDING_SPHERE, schemaData);
+            }
+            if (INIT_DATA_SCHEMA_TABLES.containsKey(entry.getKey())) {
                 ShardingSphereSchemaData schemaData = new 
ShardingSphereSchemaData();
-                appendTableData(entry, schemaData);
-                databaseData.getSchemaData().put(entry.getKey(), schemaData);
+                initTables(entry.getValue(), 
INIT_DATA_SCHEMA_TABLES.get(entry.getKey()), schemaData);
+                databaseData.putSchema(entry.getKey(), schemaData);
             }
         }
     }
     
-    private void appendTableData(final Entry<String, ShardingSphereSchema> 
schemaEntry, final ShardingSphereSchemaData schemaData) {
-        for (Entry<String, ShardingSphereTable> entry : 
schemaEntry.getValue().getTables().entrySet()) {
-            ShardingSphereTableData tableData = new 
ShardingSphereTableData(entry.getValue().getName());
-            if (null != COLLECTED_SCHEMA_TABLES.get(schemaEntry.getKey()) && 
COLLECTED_SCHEMA_TABLES.get(schemaEntry.getKey()).contains(entry.getKey())) {
-                schemaData.getTableData().put(entry.getKey(), tableData);
-            }
-            if (null != INIT_DATA_SCHEMA_TABLES.get(schemaEntry.getKey()) && 
INIT_DATA_SCHEMA_TABLES.get(schemaEntry.getKey()).contains(entry.getKey())) {
-                tableData.getRows().add(new 
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
-                schemaData.getTableData().put(entry.getKey(), tableData);
+    private void initClusterInformationTable(final ShardingSphereSchemaData 
schemaData) {
+        ShardingSphereTableData tableData = new 
ShardingSphereTableData(CLUSTER_INFORMATION);
+        tableData.getRows().add(new 
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
+        schemaData.putTable(CLUSTER_INFORMATION, tableData);
+    }
+    
+    private void initShardingTableStatisticsTable(final 
ShardingSphereSchemaData schemaData) {
+        schemaData.putTable(SHARDING_TABLE_STATISTICS, new 
ShardingSphereTableData(SHARDING_TABLE_STATISTICS));
+    }
+    
+    private void initTables(final ShardingSphereSchema schema, final 
Collection<String> tables, final ShardingSphereSchemaData schemaData) {
+        for (Entry<String, ShardingSphereTable> entry : 
schema.getTables().entrySet()) {
+            if (tables.contains(entry.getValue().getName())) {
+                ShardingSphereTableData tableData = new 
ShardingSphereTableData(entry.getValue().getName());
+                schemaData.putTable(entry.getKey(), tableData);
             }
         }
     }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
index 2e4a1f9dda9..c60fdd5ef62 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.metadata.statistics.collector;
 
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
@@ -39,8 +40,9 @@ public interface ShardingSphereStatisticsCollector extends 
TypedSPI {
      * @param databaseName database name
      * @param table table
      * @param databases databases
+     * @param globalRuleMetaData global rule meta data
      * @return ShardingSphere table data
      * @throws SQLException SQL exception
      */
-    Optional<ShardingSphereTableData> collect(String databaseName, 
ShardingSphereTable table, Map<String, ShardingSphereDatabase> databases) 
throws SQLException;
+    Optional<ShardingSphereTableData> collect(String databaseName, 
ShardingSphereTable table, Map<String, ShardingSphereDatabase> databases, 
RuleMetaData globalRuleMetaData) throws SQLException;
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
index 2fad7707705..3f1ff70e775 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
@@ -49,7 +49,8 @@ public final class PgClassTableCollector implements 
ShardingSphereStatisticsColl
             + "AND relname NOT LIKE 'matviewmap\\_%' AND relname NOT LIKE 
'mlog\\_%' AND pg_catalog.pg_table_is_visible(oid);";
     
     @Override
-    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases) throws SQLException {
+    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases,
+                                                     final RuleMetaData 
globalRuleMetaData) throws SQLException {
         Collection<ShardingSphereRowData> rows = 
ShardingSphereTableDataCollectorUtils.collectRowData(databases.get(databaseName),
                 table, 
Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()),
 SELECT_SQL);
         Collection<ShardingSphereRowData> rowData = decorateTableName(rows, 
table, databases.get(databaseName).getRuleMetaData());
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
index ae28d64811a..9c59bc8f8d4 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.metadata.statistics.collector.tables;
 
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
@@ -43,7 +44,8 @@ public final class PgNamespaceTableCollector implements 
ShardingSphereStatistics
     private static final String SELECT_SQL = "SELECT " + COLUMN_NAMES + " FROM 
pg_catalog.pg_namespace";
     
     @Override
-    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases) throws SQLException {
+    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases,
+                                                     final RuleMetaData 
globalRuleMetaData) throws SQLException {
         Collection<ShardingSphereRowData> rows = 
ShardingSphereTableDataCollectorUtils.collectRowData(databases.get(databaseName),
                 table, 
Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()),
 SELECT_SQL);
         ShardingSphereTableData result = new 
ShardingSphereTableData(PG_NAMESPACE);
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
index e8376ecfba3..5b2432095f0 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
@@ -18,11 +18,11 @@
 package org.apache.shardingsphere.infra.metadata.statistics.builder;
 
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.builder.dialect.PostgreSQLShardingSphereStatisticsBuilder;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.builder.dialect.PostgreSQLShardingSphereStatisticsBuilder;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
index 0414d3c472b..73d659533af 100644
--- 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
@@ -32,17 +32,21 @@ public final class ShardingSphereDataNode {
     
     private static final String ROOT_NODE = "statistics";
     
+    private static final String DATABASES_NODE = "databases";
+    
     private static final String SCHEMAS_NODE = "schemas";
     
     private static final String TABLES_NODE = "tables";
     
+    private static final String JOB_NODE = "job";
+    
     /**
      * Get ShardingSphere data node path.
      *
      * @return meta data node path
      */
     public static String getShardingSphereDataNodePath() {
-        return String.join("/", "", ROOT_NODE);
+        return String.join("/", "", ROOT_NODE, DATABASES_NODE);
     }
     
     /**
@@ -195,4 +199,13 @@ public final class ShardingSphereDataNode {
         Matcher matcher = pattern.matcher(rowPath);
         return matcher.find() ? Optional.of(matcher.group(4)) : 
Optional.empty();
     }
+    
+    /**
+     * Get job path.
+     *
+     * @return job path
+     */
+    public static String getJobPath() {
+        return String.join("/", ROOT_NODE, JOB_NODE);
+    }
 }
diff --git 
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
 
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
index d371b077cd1..50cc068f06d 100644
--- 
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
+++ 
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
@@ -28,106 +28,106 @@ class ShardingSphereDataNodeTest {
     
     @Test
     void assertGetShardingSphereDataNodePath() {
-        assertThat(ShardingSphereDataNode.getShardingSphereDataNodePath(), 
is("/statistics"));
+        assertThat(ShardingSphereDataNode.getShardingSphereDataNodePath(), 
is("/statistics/databases"));
     }
     
     @Test
     void assertGetDatabaseNamePath() {
-        assertThat(ShardingSphereDataNode.getDatabaseNamePath("db_path"), 
is("/statistics/db_path"));
+        assertThat(ShardingSphereDataNode.getDatabaseNamePath("db_path"), 
is("/statistics/databases/db_path"));
     }
     
     @Test
     void assertGetTablesPath() {
-        assertThat(ShardingSphereDataNode.getTablesPath("db_name", 
"db_schema"), is("/statistics/db_name/schemas/db_schema/tables"));
+        assertThat(ShardingSphereDataNode.getTablesPath("db_name", 
"db_schema"), is("/statistics/databases/db_name/schemas/db_schema/tables"));
     }
     
     @Test
     void assertGetSchemaDataPath() {
-        assertThat(ShardingSphereDataNode.getSchemaDataPath("db_name", 
"db_schema"), is("/statistics/db_name/schemas/db_schema"));
+        assertThat(ShardingSphereDataNode.getSchemaDataPath("db_name", 
"db_schema"), is("/statistics/databases/db_name/schemas/db_schema"));
     }
     
     @Test
     void assertGetSchemasPath() {
-        assertThat(ShardingSphereDataNode.getSchemasPath("db_name"), 
is("/statistics/db_name/schemas"));
+        assertThat(ShardingSphereDataNode.getSchemasPath("db_name"), 
is("/statistics/databases/db_name/schemas"));
     }
     
     @Test
     void assertGetTablePath() {
-        assertThat(ShardingSphereDataNode.getTablePath("db_name", "db_schema", 
"tbl_name"), is("/statistics/db_name/schemas/db_schema/tables/tbl_name"));
+        assertThat(ShardingSphereDataNode.getTablePath("db_name", "db_schema", 
"tbl_name"), 
is("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"));
     }
     
     @Test
     void assertGetTableRowPath() {
-        assertThat(ShardingSphereDataNode.getTableRowPath("db_name", 
"db_schema", "tbl_name", "key"), 
is("/statistics/db_name/schemas/db_schema/tables/tbl_name/key"));
+        assertThat(ShardingSphereDataNode.getTableRowPath("db_name", 
"db_schema", "tbl_name", "key"), 
is("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name/key"));
     }
     
     @Test
     void assertGetDatabaseNameHappyPath() {
-        
assertThat(ShardingSphereDataNode.getDatabaseName("/statistics/db_name"), 
is(Optional.of("db_name")));
+        
assertThat(ShardingSphereDataNode.getDatabaseName("/statistics/databases/db_name"),
 is(Optional.of("db_name")));
     }
     
     @Test
     void assertGetDatabaseNameDbNameNotFoundScenario() {
-        assertThat(ShardingSphereDataNode.getDatabaseName("/statistics"), 
is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getDatabaseName("/statistics/databases"), 
is(Optional.empty()));
     }
     
     @Test
     void assertGetSchemaNameHappyPath() {
-        
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/db_name/schemas/db_schema"),
 is(Optional.of("db_schema")));
+        
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/databases/db_name/schemas/db_schema"),
 is(Optional.of("db_schema")));
     }
     
     @Test
     void assertGetSchemaNameSchemaNameNotFoundScenario() {
-        
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/db_name"), 
is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/databases/db_name"),
 is(Optional.empty()));
     }
     
     @Test
     void assertGetDatabaseNameByDatabasePathHappyPath() {
-        
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics/db_name"),
 is(Optional.of("db_name")));
+        
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics/databases/db_name"),
 is(Optional.of("db_name")));
     }
     
     @Test
     void assertGetDatabaseNameByDatabasePathDbNameNotFoundScenario() {
-        
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics"), 
is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics/databases"),
 is(Optional.empty()));
     }
     
     @Test
     void assertGetSchemaNameBySchemaPathHappyPath() {
-        
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics/db_name/schemas/db_schema"),
 is(Optional.of("db_schema")));
+        
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics/databases/db_name/schemas/db_schema"),
 is(Optional.of("db_schema")));
     }
     
     @Test
     void assertGetSchemaNameBySchemaPathSchemaNameNotFoundScenario() {
-        
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics/db_name"),
 is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics//databasesdb_name"),
 is(Optional.empty()));
     }
     
     @Test
     void assertGetTableNameHappyPath() {
-        
assertThat(ShardingSphereDataNode.getTableName("/statistics/db_name/schemas/db_schema/tables/tbl_name"),
 is(Optional.of("tbl_name")));
+        
assertThat(ShardingSphereDataNode.getTableName("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"),
 is(Optional.of("tbl_name")));
     }
     
     @Test
     void assertGetTableNameTableNameNotFoundScenario() {
-        
assertThat(ShardingSphereDataNode.getTableName("/statistics/db_name/schemas/db_schema"),
 is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getTableName("/statistics/databases/db_name/schemas/db_schema"),
 is(Optional.empty()));
     }
     
     @Test
     void assertGetTableNameByRowPathHappyPath() {
-        
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/db_name/schemas/db_schema/tables/tbl_name"),
 is(Optional.of("tbl_name")));
+        
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"),
 is(Optional.of("tbl_name")));
     }
     
     @Test
     void assertGetTableNameByRowPathTableNameNotFoundScenario() {
-        
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/db_name/schemas/db_schema"),
 is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/databases/db_name/schemas/db_schema"),
 is(Optional.empty()));
     }
     
     @Test
     void assertGetRowUniqueKeyHappyPath() {
-        
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/db_name/schemas/db_schema/tables/tbl_name/key"),
 is(Optional.of("key")));
+        
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name/key"),
 is(Optional.of("key")));
     }
     
     @Test
     void assertGetRowUniqueKeyUniqueKeyNotFoundScenario() {
-        
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/db_name/schemas/db_schema/tables/tbl_name"),
 is(Optional.empty()));
+        
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"),
 is(Optional.empty()));
     }
 }
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
index c0e2aa28628..470518a5f3f 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
@@ -28,13 +28,9 @@ public final class 
StatisticsCollectContextManagerLifecycleListener implements C
     
     @Override
     public void onInitialized(final String databaseName, final ContextManager 
contextManager) {
-        if (!contextManager.getInstanceContext().isCluster()) {
-            return;
+        if (contextManager.getInstanceContext().isCluster() && 
InstanceType.PROXY == 
contextManager.getInstanceContext().getInstance().getMetaData().getType()) {
+            StatisticsCollectJobWorker.initialize(contextManager);
         }
-        if (InstanceType.PROXY != 
contextManager.getInstanceContext().getInstance().getMetaData().getType()) {
-            return;
-        }
-        StatisticsCollectJobWorker.initialize(contextManager);
     }
     
     @Override
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
new file mode 100644
index 00000000000..f2345cfcf62
--- /dev/null
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Statistics collect job.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class StatisticsCollectJob implements SimpleJob {
+    
+    private final ContextManager contextManager;
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        try {
+            if 
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
 {
+                ShardingSphereStatistics statistics = 
contextManager.getMetaDataContexts().getStatistics();
+                ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
+                ShardingSphereStatistics changedStatistics = new 
ShardingSphereStatistics();
+                statistics.getDatabaseData().forEach((key, value) -> {
+                    if (metaData.containsDatabase(key)) {
+                        collectForDatabase(key, value, 
metaData.getDatabases(), changedStatistics);
+                    }
+                });
+                compareUpdateAndSendEvent(statistics, changedStatistics, 
metaData.getDatabases());
+            }
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("Collect data error", ex);
+        }
+    }
+    
+    private void collectForDatabase(final String databaseName, final 
ShardingSphereDatabaseData databaseData,
+                                    final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
+        databaseData.getSchemaData().forEach((key, value) -> {
+            if (databases.get(databaseName.toLowerCase()).containsSchema(key)) 
{
+                collectForSchema(databaseName, key, value, databases, 
statistics);
+            }
+        });
+    }
+    
+    private void collectForSchema(final String databaseName, final String 
schemaName, final ShardingSphereSchemaData schemaData,
+                                  final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
+        schemaData.getTableData().forEach((key, value) -> {
+            if 
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
 {
+                collectForTable(databaseName, schemaName, 
databases.get(databaseName).getSchema(schemaName).getTable(key), databases, 
statistics);
+            }
+        });
+    }
+    
+    private void collectForTable(final String databaseName, final String 
schemaName, final ShardingSphereTable table,
+                                 final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
+        Optional<ShardingSphereStatisticsCollector> dataCollector = 
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class, 
table.getName());
+        if (!dataCollector.isPresent()) {
+            return;
+        }
+        Optional<ShardingSphereTableData> tableData = Optional.empty();
+        try {
+            tableData = dataCollector.get().collect(databaseName, table, 
databases, 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error(String.format("Collect %s.%s.%s data failed", 
databaseName, schemaName, table.getName()), ex);
+        }
+        tableData.ifPresent(optional -> 
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> 
new ShardingSphereDatabaseData())
+                .getSchemaData().computeIfAbsent(schemaName, key -> new 
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), 
optional));
+    }
+    
+    private void compareUpdateAndSendEvent(final ShardingSphereStatistics 
statistics, final ShardingSphereStatistics changedStatistics,
+                                           final Map<String, 
ShardingSphereDatabase> databases) {
+        changedStatistics.getDatabaseData().forEach((key, value) -> 
compareUpdateAndSendEventForDatabase(key, 
statistics.getDatabaseData().get(key), value, statistics,
+                databases.get(key.toLowerCase())));
+    }
+    
+    private void compareUpdateAndSendEventForDatabase(final String 
databaseName, final ShardingSphereDatabaseData databaseData, final 
ShardingSphereDatabaseData changedDatabaseData,
+                                                      final 
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
+        changedDatabaseData.getSchemaData().forEach((key, value) -> 
compareUpdateAndSendEventForSchema(databaseName, key, 
databaseData.getSchemaData().get(key), value, statistics,
+                database.getSchema(key)));
+    }
+    
+    private void compareUpdateAndSendEventForSchema(final String databaseName, 
final String schemaName, final ShardingSphereSchemaData schemaData,
+                                                    final 
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics 
statistics, final ShardingSphereSchema schema) {
+        changedSchemaData.getTableData().forEach((key, value) -> 
compareUpdateAndSendEventForTable(databaseName, schemaName, 
schemaData.getTableData().get(key), value, statistics,
+                schema.getTable(key)));
+    }
+    
+    private void compareUpdateAndSendEventForTable(final String databaseName, 
final String schemaName, final ShardingSphereTableData tableData,
+                                                   final 
ShardingSphereTableData changedTableData, final ShardingSphereStatistics 
statistics, final ShardingSphereTable table) {
+        if (tableData.equals(changedTableData)) {
+            return;
+        }
+        
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
 changedTableData);
+        ShardingSphereSchemaDataAlteredEvent event = 
getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData, 
changedTableData, table);
+        contextManager.getInstanceContext().getEventBusContext().post(event);
+    }
+    
+    private ShardingSphereSchemaDataAlteredEvent 
getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String 
schemaName, final ShardingSphereTableData tableData,
+                                                                               
          final ShardingSphereTableData changedTableData, final 
ShardingSphereTable table) {
+        ShardingSphereSchemaDataAlteredEvent result = new 
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, 
tableData.getName());
+        Map<String, ShardingSphereRowData> tableDataMap = 
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
+        Map<String, ShardingSphereRowData> changedTableDataMap = 
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
+        YamlShardingSphereRowDataSwapper swapper = new 
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
+        for (Entry<String, ShardingSphereRowData> entry : 
changedTableDataMap.entrySet()) {
+            if (!tableDataMap.containsKey(entry.getKey())) {
+                
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+            } else if 
(!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
+                
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+            }
+        }
+        for (Entry<String, ShardingSphereRowData> entry : 
tableDataMap.entrySet()) {
+            if (!changedTableDataMap.containsKey(entry.getKey())) {
+                
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
index 4c4e7ca8cc0..093b616b343 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
@@ -19,8 +19,15 @@ package 
org.apache.shardingsphere.schedule.core.job.statistics.collect;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,6 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class StatisticsCollectJobWorker {
     
+    private static final String JOB_NAME = "statistics-collect";
+    
+    private static final String CRON_EXPRESSION = "*/30 * * * * ?";
+    
     private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
     
     /**
@@ -39,15 +50,29 @@ public final class StatisticsCollectJobWorker {
      */
     public static void initialize(final ContextManager contextManager) {
         if (WORKER_INITIALIZED.compareAndSet(false, true)) {
-            boolean collectorEnabled = 
contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED);
-            if (collectorEnabled) {
-                // TODO use ejob to schedule statistics collect
-                startScheduleThread(contextManager);
-            }
+            start(contextManager);
         }
     }
     
-    private static void startScheduleThread(final ContextManager 
contextManager) {
-        new StatisticsCollectScheduler(contextManager).start();
+    private static void start(final ContextManager contextManager) {
+        ModeConfiguration modeConfig = 
contextManager.getInstanceContext().getModeConfiguration();
+        if ("ZooKeeper".equals(modeConfig.getRepository().getType())) {
+            ScheduleJobBootstrap bootstrap = new 
ScheduleJobBootstrap(createRegistryCenter(modeConfig), new 
StatisticsCollectJob(contextManager), createJobConfiguration());
+            bootstrap.schedule();
+            return;
+        }
+        throw new IllegalArgumentException("Unsupported cluster type: " + 
modeConfig.getRepository().getType());
+    }
+    
+    private static CoordinatorRegistryCenter createRegistryCenter(final 
ModeConfiguration modeConfig) {
+        ClusterPersistRepositoryConfiguration repositoryConfig = 
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
+        String namespace = String.join("/", repositoryConfig.getNamespace(), 
ShardingSphereDataNode.getJobPath());
+        CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(new 
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace));
+        result.init();
+        return result;
+    }
+    
+    private static JobConfiguration createJobConfiguration() {
+        return JobConfiguration.newBuilder(JOB_NAME, 
1).cron(CRON_EXPRESSION).overwrite(true).build();
     }
 }
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectScheduler.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectScheduler.java
deleted file mode 100644
index 68509ea4365..00000000000
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectScheduler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Statistics collect scheduler.
- */
-@RequiredArgsConstructor
-@Slf4j
-public final class StatisticsCollectScheduler {
-    
-    private final ScheduledExecutorService statisticsCollector = 
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("statistics-collect-%d"));
-    
-    private final ContextManager contextManager;
-    
-    /**
-     * Start.
-     */
-    public void start() {
-        statisticsCollector.scheduleWithFixedDelay(new 
StatisticsCollectRunnable(contextManager), 0, 30, TimeUnit.SECONDS);
-    }
-    
-    @RequiredArgsConstructor
-    protected static final class StatisticsCollectRunnable implements Runnable 
{
-        
-        private final ContextManager contextManager;
-        
-        @Override
-        public void run() {
-            ShardingSphereStatistics statistics = 
contextManager.getMetaDataContexts().getStatistics();
-            ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
-            ShardingSphereStatistics changedStatistics = new 
ShardingSphereStatistics();
-            statistics.getDatabaseData().forEach((key, value) -> {
-                if (metaData.containsDatabase(key)) {
-                    collectForDatabase(key, value, metaData.getDatabases(), 
changedStatistics);
-                }
-            });
-            compareUpdateAndSendEvent(statistics, changedStatistics, 
metaData.getDatabases());
-        }
-        
-        private void collectForDatabase(final String databaseName, final 
ShardingSphereDatabaseData databaseData,
-                                        final Map<String, 
ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
-            databaseData.getSchemaData().forEach((key, value) -> {
-                if 
(databases.get(databaseName.toLowerCase()).containsSchema(key)) {
-                    collectForSchema(databaseName, key, value, databases, 
statistics);
-                }
-            });
-        }
-        
-        private void collectForSchema(final String databaseName, final String 
schemaName, final ShardingSphereSchemaData schemaData,
-                                      final Map<String, 
ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
-            schemaData.getTableData().forEach((key, value) -> {
-                if 
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
 {
-                    collectForTable(databaseName, schemaName, 
databases.get(databaseName).getSchema(schemaName).getTable(key), databases, 
statistics);
-                }
-            });
-        }
-        
-        private void collectForTable(final String databaseName, final String 
schemaName, final ShardingSphereTable table,
-                                     final Map<String, ShardingSphereDatabase> 
databases, final ShardingSphereStatistics statistics) {
-            Optional<ShardingSphereStatisticsCollector> dataCollector = 
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class, 
table.getName());
-            if (!dataCollector.isPresent()) {
-                return;
-            }
-            Optional<ShardingSphereTableData> tableData = Optional.empty();
-            try {
-                tableData = dataCollector.get().collect(databaseName, table, 
databases);
-            } catch (final SQLException ex) {
-                log.error("Collect data failed!", ex);
-            }
-            tableData.ifPresent(optional -> 
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> 
new ShardingSphereDatabaseData())
-                    .getSchemaData().computeIfAbsent(schemaName, key -> new 
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), 
optional));
-        }
-        
-        private void compareUpdateAndSendEvent(final ShardingSphereStatistics 
statistics, final ShardingSphereStatistics changedStatistics,
-                                               final Map<String, 
ShardingSphereDatabase> databases) {
-            changedStatistics.getDatabaseData().forEach((key, value) -> 
compareUpdateAndSendEventForDatabase(key, 
statistics.getDatabaseData().get(key), value, statistics,
-                    databases.get(key.toLowerCase())));
-        }
-        
-        private void compareUpdateAndSendEventForDatabase(final String 
databaseName, final ShardingSphereDatabaseData databaseData, final 
ShardingSphereDatabaseData changedDatabaseData,
-                                                          final 
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
-            changedDatabaseData.getSchemaData().forEach((key, value) -> 
compareUpdateAndSendEventForSchema(databaseName, key, 
databaseData.getSchemaData().get(key), value, statistics,
-                    database.getSchema(key)));
-        }
-        
-        private void compareUpdateAndSendEventForSchema(final String 
databaseName, final String schemaName, final ShardingSphereSchemaData 
schemaData,
-                                                        final 
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics 
statistics, final ShardingSphereSchema schema) {
-            changedSchemaData.getTableData().forEach((key, value) -> 
compareUpdateAndSendEventForTable(databaseName, schemaName, 
schemaData.getTableData().get(key), value, statistics,
-                    schema.getTable(key)));
-        }
-        
-        private void compareUpdateAndSendEventForTable(final String 
databaseName, final String schemaName, final ShardingSphereTableData tableData,
-                                                       final 
ShardingSphereTableData changedTableData, final ShardingSphereStatistics 
statistics, final ShardingSphereTable table) {
-            if (tableData.equals(changedTableData)) {
-                return;
-            }
-            
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
 changedTableData);
-            ShardingSphereSchemaDataAlteredEvent event = 
getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData, 
changedTableData, table);
-            
contextManager.getInstanceContext().getEventBusContext().post(event);
-        }
-        
-        private ShardingSphereSchemaDataAlteredEvent 
getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String 
schemaName, final ShardingSphereTableData tableData,
-                                                                               
              final ShardingSphereTableData changedTableData, final 
ShardingSphereTable table) {
-            ShardingSphereSchemaDataAlteredEvent result = new 
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, 
tableData.getName());
-            Map<String, ShardingSphereRowData> tableDataMap = 
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
-            Map<String, ShardingSphereRowData> changedTableDataMap = 
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
 Function.identity()));
-            YamlShardingSphereRowDataSwapper swapper = new 
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
-            for (Entry<String, ShardingSphereRowData> entry : 
changedTableDataMap.entrySet()) {
-                if (!tableDataMap.containsKey(entry.getKey())) {
-                    
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-                } else if 
(!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
-                    
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-                }
-            }
-            for (Entry<String, ShardingSphereRowData> entry : 
tableDataMap.entrySet()) {
-                if (!changedTableDataMap.containsKey(entry.getKey())) {
-                    
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-                }
-            }
-            return result;
-        }
-    }
-}
diff --git 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectSchedulerTest.java
 
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
similarity index 87%
rename from 
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectSchedulerTest.java
rename to 
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
index ff235fc24de..a0fbd8bd74f 100644
--- 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectSchedulerTest.java
+++ 
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.schedule.core.job.statistics.collect;
 
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationProperties;
+import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
@@ -28,7 +30,8 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.schedule.core.job.statistics.collect.StatisticsCollectScheduler.StatisticsCollectRunnable;
+import org.apache.shardingsphere.test.util.PropertiesBuilder;
+import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Types;
@@ -42,7 +45,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-class StatisticsCollectSchedulerTest {
+class StatisticsCollectJobTest {
     
     @Test
     void assertCollect() {
@@ -52,7 +55,9 @@ class StatisticsCollectSchedulerTest {
         ShardingSphereMetaData metaData = mockMetaData();
         
when(contextManager.getMetaDataContexts().getMetaData()).thenReturn(metaData);
         
when(contextManager.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
 ConfigurationProperties(new Properties()));
-        new StatisticsCollectRunnable(contextManager).run();
+        
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
 TemporaryConfigurationProperties(
+                PropertiesBuilder.build(new 
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
 Boolean.TRUE.toString()))));
+        new StatisticsCollectJob(contextManager).execute(null);
         verify(contextManager).getInstanceContext();
     }
     
diff --git 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
 
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
index c9292d19ccc..d2c81f0b2d8 100644
--- 
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
+++ 
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.schedule.core.job.statistics.collect;
 
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
@@ -34,7 +35,8 @@ import java.util.Optional;
 public final class StatisticsCollectorFixture implements 
ShardingSphereStatisticsCollector {
     
     @Override
-    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases) throws SQLException {
+    public Optional<ShardingSphereTableData> collect(final String 
databaseName, final ShardingSphereTable table, final Map<String, 
ShardingSphereDatabase> databases,
+                                                     final RuleMetaData 
globalRuleMetaData) throws SQLException {
         ShardingSphereTableData shardingSphereTableData = new 
ShardingSphereTableData("test_table");
         shardingSphereTableData.getRows().add(new 
ShardingSphereRowData(Arrays.asList("1", "2")));
         return Optional.of(shardingSphereTableData);
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 08f29095024..c77239724a2 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -100,7 +100,7 @@ public final class RegistryCenter {
         new ClusterStatusSubscriber(repository, eventBusContext);
         new StorageNodeStatusSubscriber(repository, eventBusContext);
         new ClusterProcessSubscriber(repository, eventBusContext);
-        new ShardingSphereSchemaDataRegistrySubscriber(repository, 
globalLockPersistService, eventBusContext);
+        new ShardingSphereSchemaDataRegistrySubscriber(repository, 
eventBusContext);
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 102ae795959..7efc3f24008 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -18,11 +18,8 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.lock.GlobalLockNames;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import 
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
-import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
@@ -34,11 +31,8 @@ public final class 
ShardingSphereSchemaDataRegistrySubscriber {
     
     private final ShardingSphereDataPersistService persistService;
     
-    private final GlobalLockPersistService lockPersistService;
-    
-    public ShardingSphereSchemaDataRegistrySubscriber(final 
ClusterPersistRepository repository, final GlobalLockPersistService 
globalLockPersistService, final EventBusContext eventBusContext) {
+    public ShardingSphereSchemaDataRegistrySubscriber(final 
ClusterPersistRepository repository, final EventBusContext eventBusContext) {
         persistService = new ShardingSphereDataPersistService(repository);
-        lockPersistService = globalLockPersistService;
         eventBusContext.register(this);
     }
     
@@ -51,15 +45,8 @@ public final class 
ShardingSphereSchemaDataRegistrySubscriber {
     public void update(final ShardingSphereSchemaDataAlteredEvent event) {
         String databaseName = event.getDatabaseName();
         String schemaName = event.getSchemaName();
-        GlobalLockDefinition lockDefinition = new 
GlobalLockDefinition(String.format(GlobalLockNames.STATISTICS.getLockName(), 
event.getDatabaseName(), event.getSchemaName(), event.getTableName()));
-        if (lockPersistService.tryLock(lockDefinition, 10_000)) {
-            try {
-                
persistService.getTableRowDataPersistService().persist(databaseName, 
schemaName, event.getTableName(), event.getAddedRows());
-                
persistService.getTableRowDataPersistService().persist(databaseName, 
schemaName, event.getTableName(), event.getUpdatedRows());
-                
persistService.getTableRowDataPersistService().delete(databaseName, schemaName, 
event.getTableName(), event.getDeletedRows());
-            } finally {
-                lockPersistService.unlock(lockDefinition);
-            }
-        }
+        persistService.getTableRowDataPersistService().persist(databaseName, 
schemaName, event.getTableName(), event.getAddedRows());
+        persistService.getTableRowDataPersistService().persist(databaseName, 
schemaName, event.getTableName(), event.getUpdatedRows());
+        persistService.getTableRowDataPersistService().delete(databaseName, 
schemaName, event.getTableName(), event.getDeletedRows());
     }
 }
diff --git 
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
 
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
index 2c2aedda48b..62e0365d2da 100644
--- 
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
+++ 
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
@@ -50,6 +50,9 @@ public final class IntegrationTestCase {
     @XmlAttribute(name = "adapters")
     private String adapters;
     
+    @XmlAttribute(name = "delay-assertion-seconds")
+    private Integer delayAssertionSeconds;
+    
     @XmlElement(name = "assertion")
     private Collection<IntegrationTestCaseAssertion> assertions = new 
LinkedList<>();
 }
diff --git 
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
 
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
index eace11a53b8..0f05d683f3a 100644
--- 
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
+++ 
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.test.e2e.framework.param.model.AssertionTestPar
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import javax.xml.bind.JAXBException;
 import java.io.IOException;
@@ -36,6 +37,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -52,6 +55,9 @@ class GeneralDQLE2EIT extends BaseDQLE2EIT {
         }
         SingleE2EContainerComposer containerComposer = new 
SingleE2EContainerComposer(testParam);
         init(testParam, containerComposer);
+        if (null != 
testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds()) {
+            
Awaitility.await().atMost(Duration.ofMinutes(5)).pollDelay(testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds(),
 TimeUnit.SECONDS).until(() -> true);
+        }
         if (isUseXMLAsExpectedDataset()) {
             assertExecuteQueryWithXmlExpected(testParam, containerComposer);
         } else {
@@ -123,6 +129,9 @@ class GeneralDQLE2EIT extends BaseDQLE2EIT {
         }
         SingleE2EContainerComposer containerComposer = new 
SingleE2EContainerComposer(testParam);
         init(testParam, containerComposer);
+        if (null != 
testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds()) {
+            
Awaitility.await().atMost(Duration.ofMinutes(5)).pollDelay(testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds(),
 TimeUnit.SECONDS).until(() -> true);
+        }
         if (isUseXMLAsExpectedDataset()) {
             assertExecuteWithXmlExpected(testParam, containerComposer);
         } else {
diff --git 
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml 
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
index 709fd068fd0..f192d8bfa19 100644
--- 
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
+++ 
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
@@ -207,7 +207,7 @@
         <assertion expected-data-source-name="expected_dataset" />
     </test-case>
     
-    <test-case sql="select * from shardingsphere.sharding_table_statistics 
order by id;" db-types="MySQL,PostgreSQL,openGauss" scenario-types="db">
+    <test-case sql="select * from shardingsphere.sharding_table_statistics 
order by id;" db-types="MySQL,PostgreSQL,openGauss" scenario-types="db" 
delay-assertion-seconds="40">
         <assertion expected-data-file="select_sharding_table_statistics.xml" />
     </test-case>
 </integration-test-cases>

Reply via email to