This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cdd25eebd69 Refactor meta data collect (#30727)
cdd25eebd69 is described below
commit cdd25eebd69fda66769efa3a1d72ff01d504b215
Author: jiangML <[email protected]>
AuthorDate: Tue Apr 2 14:59:18 2024 +0800
Refactor meta data collect (#30727)
* Refactor meta data collect
* Refactor ShardingSphereSchemaDataRegistrySubscriber
* Add comment for StatisticsCollectJob class
* Fix checkstyle error
* Update proxy-meta-data-collector-enabled variable
* Refactor ShardingSphereStatisticsBuilder
* Optimize PostgreSQLShardingSphereStatisticsBuilder
* Optimize MySQLShardingSphereStatisticsBuilder
* Fix test error
* Fix e2e test error
---
.../data/ShardingStatisticsTableCollector.java | 4 +-
.../TemporaryConfigurationPropertyKey.java | 2 +-
.../shardingsphere/infra/lock/GlobalLockNames.java | 2 -
.../MySQLShardingSphereStatisticsBuilder.java | 50 +++---
.../PostgreSQLShardingSphereStatisticsBuilder.java | 62 ++++----
.../ShardingSphereStatisticsCollector.java | 4 +-
.../collector/tables/PgClassTableCollector.java | 3 +-
.../tables/PgNamespaceTableCollector.java | 4 +-
...tgreSQLShardingSphereStatisticsBuilderTest.java | 4 +-
.../persist/node/ShardingSphereDataNode.java | 15 +-
.../persist/node/ShardingSphereDataNodeTest.java | 42 +++---
...ticsCollectContextManagerLifecycleListener.java | 8 +-
.../statistics/collect/StatisticsCollectJob.java | 161 ++++++++++++++++++++
.../collect/StatisticsCollectJobWorker.java | 41 ++++-
.../collect/StatisticsCollectScheduler.java | 168 ---------------------
...ulerTest.java => StatisticsCollectJobTest.java} | 11 +-
.../collect/StatisticsCollectorFixture.java | 4 +-
.../cluster/coordinator/RegistryCenter.java | 2 +-
...ShardingSphereSchemaDataRegistrySubscriber.java | 21 +--
.../e2e/cases/assertion/IntegrationTestCase.java | 3 +
.../test/e2e/engine/type/dql/GeneralDQLE2EIT.java | 9 ++
.../cases/dql/dql-integration-select-order-by.xml | 2 +-
22 files changed, 341 insertions(+), 281 deletions(-)
diff --git
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index 802f0129b20..73a62d5bb33 100644
---
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
@@ -48,7 +49,8 @@ public final class ShardingStatisticsTableCollector
implements ShardingSphereSta
private static final String SHARDING_TABLE_STATISTICS =
"sharding_table_statistics";
@Override
- public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases) throws SQLException {
+ public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases,
+ final RuleMetaData
globalRuleMetaData) throws SQLException {
ShardingSphereTableData result = new
ShardingSphereTableData(SHARDING_TABLE_STATISTICS);
DatabaseType protocolType =
databases.values().iterator().next().getProtocolType();
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(protocolType).getDialectDatabaseMetaData();
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
index b8e17608a33..fdd63ddaf33 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/temporary/TemporaryConfigurationPropertyKey.java
@@ -35,7 +35,7 @@ public enum TemporaryConfigurationPropertyKey implements
TypedPropertyKey {
/**
* Proxy meta data collector enabled.
*/
- PROXY_META_DATA_COLLECTOR_ENABLED("proxy-meta-data-collector-enabled",
String.valueOf(Boolean.FALSE), boolean.class, true),
+ PROXY_META_DATA_COLLECTOR_ENABLED("proxy-meta-data-collector-enabled",
String.valueOf(Boolean.FALSE), boolean.class, false),
/**
* System schema metadata enabled.
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
index 356bdacbc31..927c98f6d84 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
@@ -24,8 +24,6 @@ import lombok.RequiredArgsConstructor;
@Getter
public enum GlobalLockNames {
- STATISTICS("statistics_%s_%s_%s"),
-
CLUSTER_LOCK("cluster_lock"),
PREPARE("prepare_%s"),
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
index 76265484569..51c9ebb256c 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/MySQLShardingSphereStatisticsBuilder.java
@@ -19,16 +19,15 @@ package
org.apache.shardingsphere.infra.metadata.statistics.builder.dialect;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsBuilder;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import java.util.Optional;
import java.util.Collections;
import java.util.Map.Entry;
@@ -42,27 +41,42 @@ public final class MySQLShardingSphereStatisticsBuilder
implements ShardingSpher
private static final String CLUSTER_INFORMATION = "cluster_information";
+ private static final String SHARDING_TABLE_STATISTICS =
"sharding_table_statistics";
+
@Override
public ShardingSphereStatistics build(final ShardingSphereMetaData
metaData) {
ShardingSphereStatistics result = new ShardingSphereStatistics();
- Optional<ShardingSphereSchema> shardingSphereSchema =
Optional.ofNullable(metaData.getDatabase(SHARDING_SPHERE)).map(database ->
database.getSchema(SHARDING_SPHERE));
- if (!shardingSphereSchema.isPresent()) {
- return result;
- }
- ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
- for (Entry<String, ShardingSphereTable> entry :
shardingSphereSchema.get().getTables().entrySet()) {
- ShardingSphereTableData tableData = new
ShardingSphereTableData(entry.getValue().getName());
- if (CLUSTER_INFORMATION.equals(entry.getKey())) {
- tableData.getRows().add(new
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
+ for (Entry<String, ShardingSphereDatabase> entry :
metaData.getDatabases().entrySet()) {
+ ShardingSphereDatabaseData databaseData = new
ShardingSphereDatabaseData();
+ initSchemas(entry.getValue(), databaseData);
+ if (!databaseData.getSchemaData().isEmpty()) {
+ result.putDatabase(entry.getKey(), databaseData);
}
- schemaData.getTableData().put(entry.getKey(), tableData);
}
- ShardingSphereDatabaseData databaseData = new
ShardingSphereDatabaseData();
- databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
- result.getDatabaseData().put(SHARDING_SPHERE, databaseData);
return result;
}
+ private void initSchemas(final ShardingSphereDatabase database, final
ShardingSphereDatabaseData databaseData) {
+ for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
+ if (SHARDING_SPHERE.equals(entry.getKey())) {
+ ShardingSphereSchemaData schemaData = new
ShardingSphereSchemaData();
+ initClusterInformationTable(schemaData);
+ initShardingTableStatisticsTable(schemaData);
+ databaseData.putSchema(SHARDING_SPHERE, schemaData);
+ }
+ }
+ }
+
+ private void initClusterInformationTable(final ShardingSphereSchemaData
schemaData) {
+ ShardingSphereTableData tableData = new
ShardingSphereTableData(CLUSTER_INFORMATION);
+ tableData.getRows().add(new
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
+ schemaData.putTable(CLUSTER_INFORMATION, tableData);
+ }
+
+ private void initShardingTableStatisticsTable(final
ShardingSphereSchemaData schemaData) {
+ schemaData.putTable(SHARDING_TABLE_STATISTICS, new
ShardingSphereTableData(SHARDING_TABLE_STATISTICS));
+ }
+
@Override
public String getDatabaseType() {
return "MySQL";
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
index a4a38d3c0fd..94085419010 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/builder/dialect/PostgreSQLShardingSphereStatisticsBuilder.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.infra.metadata.statistics.builder.dialect;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
-import
org.apache.shardingsphere.infra.database.core.metadata.database.system.SystemDatabase;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -30,7 +28,6 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import
org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsBuilder;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Arrays;
import java.util.Collection;
@@ -45,51 +42,62 @@ import java.util.Map.Entry;
public final class PostgreSQLShardingSphereStatisticsBuilder implements
ShardingSphereStatisticsBuilder {
- private static final Map<String, Collection<String>>
COLLECTED_SCHEMA_TABLES = new LinkedHashMap<>();
+ private static final String SHARDING_SPHERE = "shardingsphere";
- private static final Map<String, Collection<String>>
INIT_DATA_SCHEMA_TABLES = new LinkedHashMap<>();
+ private static final String CLUSTER_INFORMATION = "cluster_information";
+
+ private static final String SHARDING_TABLE_STATISTICS =
"sharding_table_statistics";
- private final SystemDatabase systemDatabase = new
SystemDatabase(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"));
+ private static final Map<String, Collection<String>>
INIT_DATA_SCHEMA_TABLES = new LinkedHashMap<>();
static {
- COLLECTED_SCHEMA_TABLES.put("shardingsphere",
Collections.singletonList("sharding_table_statistics"));
- COLLECTED_SCHEMA_TABLES.put("pg_catalog", Arrays.asList("pg_class",
"pg_namespace"));
- INIT_DATA_SCHEMA_TABLES.put("shardingsphere",
Collections.singletonList("cluster_information"));
+ INIT_DATA_SCHEMA_TABLES.put("pg_catalog", Arrays.asList("pg_class",
"pg_namespace"));
}
@Override
public ShardingSphereStatistics build(final ShardingSphereMetaData
metaData) {
ShardingSphereStatistics result = new ShardingSphereStatistics();
for (Entry<String, ShardingSphereDatabase> entry :
metaData.getDatabases().entrySet()) {
- if
(systemDatabase.getSystemDatabaseSchemaMap().containsKey(entry.getKey())) {
- continue;
- }
ShardingSphereDatabaseData databaseData = new
ShardingSphereDatabaseData();
- appendSchemaData(entry.getValue(), databaseData);
- result.getDatabaseData().put(entry.getKey(), databaseData);
+ initSchemas(entry.getValue(), databaseData);
+ if (!databaseData.getSchemaData().isEmpty()) {
+ result.putDatabase(entry.getKey(), databaseData);
+ }
}
return result;
}
- private void appendSchemaData(final ShardingSphereDatabase database, final
ShardingSphereDatabaseData databaseData) {
+ private void initSchemas(final ShardingSphereDatabase database, final
ShardingSphereDatabaseData databaseData) {
for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
- if (COLLECTED_SCHEMA_TABLES.containsKey(entry.getKey()) ||
INIT_DATA_SCHEMA_TABLES.containsKey(entry.getKey())) {
+ if (SHARDING_SPHERE.equals(entry.getKey())) {
+ ShardingSphereSchemaData schemaData = new
ShardingSphereSchemaData();
+ initClusterInformationTable(schemaData);
+ initShardingTableStatisticsTable(schemaData);
+ databaseData.putSchema(SHARDING_SPHERE, schemaData);
+ }
+ if (INIT_DATA_SCHEMA_TABLES.containsKey(entry.getKey())) {
ShardingSphereSchemaData schemaData = new
ShardingSphereSchemaData();
- appendTableData(entry, schemaData);
- databaseData.getSchemaData().put(entry.getKey(), schemaData);
+ initTables(entry.getValue(),
INIT_DATA_SCHEMA_TABLES.get(entry.getKey()), schemaData);
+ databaseData.putSchema(entry.getKey(), schemaData);
}
}
}
- private void appendTableData(final Entry<String, ShardingSphereSchema>
schemaEntry, final ShardingSphereSchemaData schemaData) {
- for (Entry<String, ShardingSphereTable> entry :
schemaEntry.getValue().getTables().entrySet()) {
- ShardingSphereTableData tableData = new
ShardingSphereTableData(entry.getValue().getName());
- if (null != COLLECTED_SCHEMA_TABLES.get(schemaEntry.getKey()) &&
COLLECTED_SCHEMA_TABLES.get(schemaEntry.getKey()).contains(entry.getKey())) {
- schemaData.getTableData().put(entry.getKey(), tableData);
- }
- if (null != INIT_DATA_SCHEMA_TABLES.get(schemaEntry.getKey()) &&
INIT_DATA_SCHEMA_TABLES.get(schemaEntry.getKey()).contains(entry.getKey())) {
- tableData.getRows().add(new
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
- schemaData.getTableData().put(entry.getKey(), tableData);
+ private void initClusterInformationTable(final ShardingSphereSchemaData
schemaData) {
+ ShardingSphereTableData tableData = new
ShardingSphereTableData(CLUSTER_INFORMATION);
+ tableData.getRows().add(new
ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
+ schemaData.putTable(CLUSTER_INFORMATION, tableData);
+ }
+
+ private void initShardingTableStatisticsTable(final
ShardingSphereSchemaData schemaData) {
+ schemaData.putTable(SHARDING_TABLE_STATISTICS, new
ShardingSphereTableData(SHARDING_TABLE_STATISTICS));
+ }
+
+ private void initTables(final ShardingSphereSchema schema, final
Collection<String> tables, final ShardingSphereSchemaData schemaData) {
+ for (Entry<String, ShardingSphereTable> entry :
schema.getTables().entrySet()) {
+ if (tables.contains(entry.getValue().getName())) {
+ ShardingSphereTableData tableData = new
ShardingSphereTableData(entry.getValue().getName());
+ schemaData.putTable(entry.getKey(), tableData);
}
}
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
index 2e4a1f9dda9..c60fdd5ef62 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/ShardingSphereStatisticsCollector.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.metadata.statistics.collector;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
@@ -39,8 +40,9 @@ public interface ShardingSphereStatisticsCollector extends
TypedSPI {
* @param databaseName database name
* @param table table
* @param databases databases
+ * @param globalRuleMetaData global rule meta data
* @return ShardingSphere table data
* @throws SQLException SQL exception
*/
- Optional<ShardingSphereTableData> collect(String databaseName,
ShardingSphereTable table, Map<String, ShardingSphereDatabase> databases)
throws SQLException;
+ Optional<ShardingSphereTableData> collect(String databaseName,
ShardingSphereTable table, Map<String, ShardingSphereDatabase> databases,
RuleMetaData globalRuleMetaData) throws SQLException;
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
index 2fad7707705..3f1ff70e775 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgClassTableCollector.java
@@ -49,7 +49,8 @@ public final class PgClassTableCollector implements
ShardingSphereStatisticsColl
+ "AND relname NOT LIKE 'matviewmap\\_%' AND relname NOT LIKE
'mlog\\_%' AND pg_catalog.pg_table_is_visible(oid);";
@Override
- public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases) throws SQLException {
+ public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases,
+ final RuleMetaData
globalRuleMetaData) throws SQLException {
Collection<ShardingSphereRowData> rows =
ShardingSphereTableDataCollectorUtils.collectRowData(databases.get(databaseName),
table,
Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()),
SELECT_SQL);
Collection<ShardingSphereRowData> rowData = decorateTableName(rows,
table, databases.get(databaseName).getRuleMetaData());
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
index ae28d64811a..9c59bc8f8d4 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/tables/PgNamespaceTableCollector.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.metadata.statistics.collector.tables;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
@@ -43,7 +44,8 @@ public final class PgNamespaceTableCollector implements
ShardingSphereStatistics
private static final String SELECT_SQL = "SELECT " + COLUMN_NAMES + " FROM
pg_catalog.pg_namespace";
@Override
- public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases) throws SQLException {
+ public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases,
+ final RuleMetaData
globalRuleMetaData) throws SQLException {
Collection<ShardingSphereRowData> rows =
ShardingSphereTableDataCollectorUtils.collectRowData(databases.get(databaseName),
table,
Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()),
SELECT_SQL);
ShardingSphereTableData result = new
ShardingSphereTableData(PG_NAMESPACE);
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
index e8376ecfba3..5b2432095f0 100644
---
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/statistics/builder/PostgreSQLShardingSphereStatisticsBuilderTest.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.infra.metadata.statistics.builder;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import
org.apache.shardingsphere.infra.metadata.statistics.builder.dialect.PostgreSQLShardingSphereStatisticsBuilder;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import
org.apache.shardingsphere.infra.metadata.statistics.builder.dialect.PostgreSQLShardingSphereStatisticsBuilder;
import org.junit.jupiter.api.Test;
import java.util.Collections;
diff --git
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
index 0414d3c472b..73d659533af 100644
---
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNode.java
@@ -32,17 +32,21 @@ public final class ShardingSphereDataNode {
private static final String ROOT_NODE = "statistics";
+ private static final String DATABASES_NODE = "databases";
+
private static final String SCHEMAS_NODE = "schemas";
private static final String TABLES_NODE = "tables";
+ private static final String JOB_NODE = "job";
+
/**
* Get ShardingSphere data node path.
*
* @return meta data node path
*/
public static String getShardingSphereDataNodePath() {
- return String.join("/", "", ROOT_NODE);
+ return String.join("/", "", ROOT_NODE, DATABASES_NODE);
}
/**
@@ -195,4 +199,13 @@ public final class ShardingSphereDataNode {
Matcher matcher = pattern.matcher(rowPath);
return matcher.find() ? Optional.of(matcher.group(4)) :
Optional.empty();
}
+
+ /**
+ * Get job path.
+ *
+ * @return job path
+ */
+ public static String getJobPath() {
+ return String.join("/", ROOT_NODE, JOB_NODE);
+ }
}
diff --git
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
index d371b077cd1..50cc068f06d 100644
---
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
+++
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ShardingSphereDataNodeTest.java
@@ -28,106 +28,106 @@ class ShardingSphereDataNodeTest {
@Test
void assertGetShardingSphereDataNodePath() {
- assertThat(ShardingSphereDataNode.getShardingSphereDataNodePath(),
is("/statistics"));
+ assertThat(ShardingSphereDataNode.getShardingSphereDataNodePath(),
is("/statistics/databases"));
}
@Test
void assertGetDatabaseNamePath() {
- assertThat(ShardingSphereDataNode.getDatabaseNamePath("db_path"),
is("/statistics/db_path"));
+ assertThat(ShardingSphereDataNode.getDatabaseNamePath("db_path"),
is("/statistics/databases/db_path"));
}
@Test
void assertGetTablesPath() {
- assertThat(ShardingSphereDataNode.getTablesPath("db_name",
"db_schema"), is("/statistics/db_name/schemas/db_schema/tables"));
+ assertThat(ShardingSphereDataNode.getTablesPath("db_name",
"db_schema"), is("/statistics/databases/db_name/schemas/db_schema/tables"));
}
@Test
void assertGetSchemaDataPath() {
- assertThat(ShardingSphereDataNode.getSchemaDataPath("db_name",
"db_schema"), is("/statistics/db_name/schemas/db_schema"));
+ assertThat(ShardingSphereDataNode.getSchemaDataPath("db_name",
"db_schema"), is("/statistics/databases/db_name/schemas/db_schema"));
}
@Test
void assertGetSchemasPath() {
- assertThat(ShardingSphereDataNode.getSchemasPath("db_name"),
is("/statistics/db_name/schemas"));
+ assertThat(ShardingSphereDataNode.getSchemasPath("db_name"),
is("/statistics/databases/db_name/schemas"));
}
@Test
void assertGetTablePath() {
- assertThat(ShardingSphereDataNode.getTablePath("db_name", "db_schema",
"tbl_name"), is("/statistics/db_name/schemas/db_schema/tables/tbl_name"));
+ assertThat(ShardingSphereDataNode.getTablePath("db_name", "db_schema",
"tbl_name"),
is("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"));
}
@Test
void assertGetTableRowPath() {
- assertThat(ShardingSphereDataNode.getTableRowPath("db_name",
"db_schema", "tbl_name", "key"),
is("/statistics/db_name/schemas/db_schema/tables/tbl_name/key"));
+ assertThat(ShardingSphereDataNode.getTableRowPath("db_name",
"db_schema", "tbl_name", "key"),
is("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name/key"));
}
@Test
void assertGetDatabaseNameHappyPath() {
-
assertThat(ShardingSphereDataNode.getDatabaseName("/statistics/db_name"),
is(Optional.of("db_name")));
+
assertThat(ShardingSphereDataNode.getDatabaseName("/statistics/databases/db_name"),
is(Optional.of("db_name")));
}
@Test
void assertGetDatabaseNameDbNameNotFoundScenario() {
- assertThat(ShardingSphereDataNode.getDatabaseName("/statistics"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getDatabaseName("/statistics/databases"),
is(Optional.empty()));
}
@Test
void assertGetSchemaNameHappyPath() {
-
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/db_name/schemas/db_schema"),
is(Optional.of("db_schema")));
+
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/databases/db_name/schemas/db_schema"),
is(Optional.of("db_schema")));
}
@Test
void assertGetSchemaNameSchemaNameNotFoundScenario() {
-
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/db_name"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getSchemaName("/statistics/databases/db_name"),
is(Optional.empty()));
}
@Test
void assertGetDatabaseNameByDatabasePathHappyPath() {
-
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics/db_name"),
is(Optional.of("db_name")));
+
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics/databases/db_name"),
is(Optional.of("db_name")));
}
@Test
void assertGetDatabaseNameByDatabasePathDbNameNotFoundScenario() {
-
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getDatabaseNameByDatabasePath("/statistics/databases"),
is(Optional.empty()));
}
@Test
void assertGetSchemaNameBySchemaPathHappyPath() {
-
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics/db_name/schemas/db_schema"),
is(Optional.of("db_schema")));
+
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics/databases/db_name/schemas/db_schema"),
is(Optional.of("db_schema")));
}
@Test
void assertGetSchemaNameBySchemaPathSchemaNameNotFoundScenario() {
-
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics/db_name"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getSchemaNameBySchemaPath("/statistics//databasesdb_name"),
is(Optional.empty()));
}
@Test
void assertGetTableNameHappyPath() {
-
assertThat(ShardingSphereDataNode.getTableName("/statistics/db_name/schemas/db_schema/tables/tbl_name"),
is(Optional.of("tbl_name")));
+
assertThat(ShardingSphereDataNode.getTableName("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"),
is(Optional.of("tbl_name")));
}
@Test
void assertGetTableNameTableNameNotFoundScenario() {
-
assertThat(ShardingSphereDataNode.getTableName("/statistics/db_name/schemas/db_schema"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getTableName("/statistics/databases/db_name/schemas/db_schema"),
is(Optional.empty()));
}
@Test
void assertGetTableNameByRowPathHappyPath() {
-
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/db_name/schemas/db_schema/tables/tbl_name"),
is(Optional.of("tbl_name")));
+
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"),
is(Optional.of("tbl_name")));
}
@Test
void assertGetTableNameByRowPathTableNameNotFoundScenario() {
-
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/db_name/schemas/db_schema"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getTableNameByRowPath("/statistics/databases/db_name/schemas/db_schema"),
is(Optional.empty()));
}
@Test
void assertGetRowUniqueKeyHappyPath() {
-
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/db_name/schemas/db_schema/tables/tbl_name/key"),
is(Optional.of("key")));
+
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name/key"),
is(Optional.of("key")));
}
@Test
void assertGetRowUniqueKeyUniqueKeyNotFoundScenario() {
-
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/db_name/schemas/db_schema/tables/tbl_name"),
is(Optional.empty()));
+
assertThat(ShardingSphereDataNode.getRowUniqueKey("/statistics/databases/db_name/schemas/db_schema/tables/tbl_name"),
is(Optional.empty()));
}
}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
index c0e2aa28628..470518a5f3f 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
@@ -28,13 +28,9 @@ public final class
StatisticsCollectContextManagerLifecycleListener implements C
@Override
public void onInitialized(final String databaseName, final ContextManager
contextManager) {
- if (!contextManager.getInstanceContext().isCluster()) {
- return;
+ if (contextManager.getInstanceContext().isCluster() &&
InstanceType.PROXY ==
contextManager.getInstanceContext().getInstance().getMetaData().getType()) {
+ StatisticsCollectJobWorker.initialize(contextManager);
}
- if (InstanceType.PROXY !=
contextManager.getInstanceContext().getInstance().getMetaData().getType()) {
- return;
- }
- StatisticsCollectJobWorker.initialize(contextManager);
}
@Override
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
new file mode 100644
index 00000000000..f2345cfcf62
--- /dev/null
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Statistics collect job.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class StatisticsCollectJob implements SimpleJob {
+
+ private final ContextManager contextManager;
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ try {
+ if
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
{
+ ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
+ ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
+ ShardingSphereStatistics changedStatistics = new
ShardingSphereStatistics();
+ statistics.getDatabaseData().forEach((key, value) -> {
+ if (metaData.containsDatabase(key)) {
+ collectForDatabase(key, value,
metaData.getDatabases(), changedStatistics);
+ }
+ });
+ compareUpdateAndSendEvent(statistics, changedStatistics,
metaData.getDatabases());
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("Collect data error", ex);
+ }
+ }
+
+ private void collectForDatabase(final String databaseName, final
ShardingSphereDatabaseData databaseData,
+ final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
+ databaseData.getSchemaData().forEach((key, value) -> {
+ if (databases.get(databaseName.toLowerCase()).containsSchema(key))
{
+ collectForSchema(databaseName, key, value, databases,
statistics);
+ }
+ });
+ }
+
+ private void collectForSchema(final String databaseName, final String
schemaName, final ShardingSphereSchemaData schemaData,
+ final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
+ schemaData.getTableData().forEach((key, value) -> {
+ if
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
{
+ collectForTable(databaseName, schemaName,
databases.get(databaseName).getSchema(schemaName).getTable(key), databases,
statistics);
+ }
+ });
+ }
+
+ private void collectForTable(final String databaseName, final String
schemaName, final ShardingSphereTable table,
+ final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
+ Optional<ShardingSphereStatisticsCollector> dataCollector =
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class,
table.getName());
+ if (!dataCollector.isPresent()) {
+ return;
+ }
+ Optional<ShardingSphereTableData> tableData = Optional.empty();
+ try {
+ tableData = dataCollector.get().collect(databaseName, table,
databases,
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error(String.format("Collect %s.%s.%s data failed",
databaseName, schemaName, table.getName()), ex);
+ }
+ tableData.ifPresent(optional ->
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key ->
new ShardingSphereDatabaseData())
+ .getSchemaData().computeIfAbsent(schemaName, key -> new
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(),
optional));
+ }
+
+ private void compareUpdateAndSendEvent(final ShardingSphereStatistics
statistics, final ShardingSphereStatistics changedStatistics,
+ final Map<String,
ShardingSphereDatabase> databases) {
+ changedStatistics.getDatabaseData().forEach((key, value) ->
compareUpdateAndSendEventForDatabase(key,
statistics.getDatabaseData().get(key), value, statistics,
+ databases.get(key.toLowerCase())));
+ }
+
+ private void compareUpdateAndSendEventForDatabase(final String
databaseName, final ShardingSphereDatabaseData databaseData, final
ShardingSphereDatabaseData changedDatabaseData,
+ final
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
+ changedDatabaseData.getSchemaData().forEach((key, value) ->
compareUpdateAndSendEventForSchema(databaseName, key,
databaseData.getSchemaData().get(key), value, statistics,
+ database.getSchema(key)));
+ }
+
+ private void compareUpdateAndSendEventForSchema(final String databaseName,
final String schemaName, final ShardingSphereSchemaData schemaData,
+ final
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics
statistics, final ShardingSphereSchema schema) {
+ changedSchemaData.getTableData().forEach((key, value) ->
compareUpdateAndSendEventForTable(databaseName, schemaName,
schemaData.getTableData().get(key), value, statistics,
+ schema.getTable(key)));
+ }
+
+ private void compareUpdateAndSendEventForTable(final String databaseName,
final String schemaName, final ShardingSphereTableData tableData,
+ final
ShardingSphereTableData changedTableData, final ShardingSphereStatistics
statistics, final ShardingSphereTable table) {
+ if (tableData.equals(changedTableData)) {
+ return;
+ }
+
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
changedTableData);
+ ShardingSphereSchemaDataAlteredEvent event =
getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData,
changedTableData, table);
+ contextManager.getInstanceContext().getEventBusContext().post(event);
+ }
+
+ private ShardingSphereSchemaDataAlteredEvent
getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
+
final ShardingSphereTableData changedTableData, final
ShardingSphereTable table) {
+ ShardingSphereSchemaDataAlteredEvent result = new
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName,
tableData.getName());
+ Map<String, ShardingSphereRowData> tableDataMap =
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
+ Map<String, ShardingSphereRowData> changedTableDataMap =
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
+ YamlShardingSphereRowDataSwapper swapper = new
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
+ for (Entry<String, ShardingSphereRowData> entry :
changedTableDataMap.entrySet()) {
+ if (!tableDataMap.containsKey(entry.getKey())) {
+
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+ } else if
(!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
+
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+ }
+ }
+ for (Entry<String, ShardingSphereRowData> entry :
tableDataMap.entrySet()) {
+ if (!changedTableDataMap.containsKey(entry.getKey())) {
+
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+ }
+ }
+ return result;
+ }
+}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
index 4c4e7ca8cc0..093b616b343 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
@@ -19,8 +19,15 @@ package
org.apache.shardingsphere.schedule.core.job.statistics.collect;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,6 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class StatisticsCollectJobWorker {
+ private static final String JOB_NAME = "statistics-collect";
+
+ private static final String CRON_EXPRESSION = "*/30 * * * * ?";
+
private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
/**
@@ -39,15 +50,29 @@ public final class StatisticsCollectJobWorker {
*/
public static void initialize(final ContextManager contextManager) {
if (WORKER_INITIALIZED.compareAndSet(false, true)) {
- boolean collectorEnabled =
contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED);
- if (collectorEnabled) {
- // TODO use ejob to schedule statistics collect
- startScheduleThread(contextManager);
- }
+ start(contextManager);
}
}
- private static void startScheduleThread(final ContextManager
contextManager) {
- new StatisticsCollectScheduler(contextManager).start();
+ private static void start(final ContextManager contextManager) {
+ ModeConfiguration modeConfig =
contextManager.getInstanceContext().getModeConfiguration();
+ if ("ZooKeeper".equals(modeConfig.getRepository().getType())) {
+ ScheduleJobBootstrap bootstrap = new
ScheduleJobBootstrap(createRegistryCenter(modeConfig), new
StatisticsCollectJob(contextManager), createJobConfiguration());
+ bootstrap.schedule();
+ return;
+ }
+ throw new IllegalArgumentException("Unsupported cluster type: " +
modeConfig.getRepository().getType());
+ }
+
+ private static CoordinatorRegistryCenter createRegistryCenter(final
ModeConfiguration modeConfig) {
+ ClusterPersistRepositoryConfiguration repositoryConfig =
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
+ String namespace = String.join("/", repositoryConfig.getNamespace(),
ShardingSphereDataNode.getJobPath());
+ CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(new
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace));
+ result.init();
+ return result;
+ }
+
+ private static JobConfiguration createJobConfiguration() {
+ return JobConfiguration.newBuilder(JOB_NAME,
1).cron(CRON_EXPRESSION).overwrite(true).build();
}
}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectScheduler.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectScheduler.java
deleted file mode 100644
index 68509ea4365..00000000000
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectScheduler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
-import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Statistics collect scheduler.
- */
-@RequiredArgsConstructor
-@Slf4j
-public final class StatisticsCollectScheduler {
-
- private final ScheduledExecutorService statisticsCollector =
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("statistics-collect-%d"));
-
- private final ContextManager contextManager;
-
- /**
- * Start.
- */
- public void start() {
- statisticsCollector.scheduleWithFixedDelay(new
StatisticsCollectRunnable(contextManager), 0, 30, TimeUnit.SECONDS);
- }
-
- @RequiredArgsConstructor
- protected static final class StatisticsCollectRunnable implements Runnable
{
-
- private final ContextManager contextManager;
-
- @Override
- public void run() {
- ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
- ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
- ShardingSphereStatistics changedStatistics = new
ShardingSphereStatistics();
- statistics.getDatabaseData().forEach((key, value) -> {
- if (metaData.containsDatabase(key)) {
- collectForDatabase(key, value, metaData.getDatabases(),
changedStatistics);
- }
- });
- compareUpdateAndSendEvent(statistics, changedStatistics,
metaData.getDatabases());
- }
-
- private void collectForDatabase(final String databaseName, final
ShardingSphereDatabaseData databaseData,
- final Map<String,
ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
- databaseData.getSchemaData().forEach((key, value) -> {
- if
(databases.get(databaseName.toLowerCase()).containsSchema(key)) {
- collectForSchema(databaseName, key, value, databases,
statistics);
- }
- });
- }
-
- private void collectForSchema(final String databaseName, final String
schemaName, final ShardingSphereSchemaData schemaData,
- final Map<String,
ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
- schemaData.getTableData().forEach((key, value) -> {
- if
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
{
- collectForTable(databaseName, schemaName,
databases.get(databaseName).getSchema(schemaName).getTable(key), databases,
statistics);
- }
- });
- }
-
- private void collectForTable(final String databaseName, final String
schemaName, final ShardingSphereTable table,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
- Optional<ShardingSphereStatisticsCollector> dataCollector =
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class,
table.getName());
- if (!dataCollector.isPresent()) {
- return;
- }
- Optional<ShardingSphereTableData> tableData = Optional.empty();
- try {
- tableData = dataCollector.get().collect(databaseName, table,
databases);
- } catch (final SQLException ex) {
- log.error("Collect data failed!", ex);
- }
- tableData.ifPresent(optional ->
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key ->
new ShardingSphereDatabaseData())
- .getSchemaData().computeIfAbsent(schemaName, key -> new
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(),
optional));
- }
-
- private void compareUpdateAndSendEvent(final ShardingSphereStatistics
statistics, final ShardingSphereStatistics changedStatistics,
- final Map<String,
ShardingSphereDatabase> databases) {
- changedStatistics.getDatabaseData().forEach((key, value) ->
compareUpdateAndSendEventForDatabase(key,
statistics.getDatabaseData().get(key), value, statistics,
- databases.get(key.toLowerCase())));
- }
-
- private void compareUpdateAndSendEventForDatabase(final String
databaseName, final ShardingSphereDatabaseData databaseData, final
ShardingSphereDatabaseData changedDatabaseData,
- final
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
- changedDatabaseData.getSchemaData().forEach((key, value) ->
compareUpdateAndSendEventForSchema(databaseName, key,
databaseData.getSchemaData().get(key), value, statistics,
- database.getSchema(key)));
- }
-
- private void compareUpdateAndSendEventForSchema(final String
databaseName, final String schemaName, final ShardingSphereSchemaData
schemaData,
- final
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics
statistics, final ShardingSphereSchema schema) {
- changedSchemaData.getTableData().forEach((key, value) ->
compareUpdateAndSendEventForTable(databaseName, schemaName,
schemaData.getTableData().get(key), value, statistics,
- schema.getTable(key)));
- }
-
- private void compareUpdateAndSendEventForTable(final String
databaseName, final String schemaName, final ShardingSphereTableData tableData,
- final
ShardingSphereTableData changedTableData, final ShardingSphereStatistics
statistics, final ShardingSphereTable table) {
- if (tableData.equals(changedTableData)) {
- return;
- }
-
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
changedTableData);
- ShardingSphereSchemaDataAlteredEvent event =
getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData,
changedTableData, table);
-
contextManager.getInstanceContext().getEventBusContext().post(event);
- }
-
- private ShardingSphereSchemaDataAlteredEvent
getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
-
final ShardingSphereTableData changedTableData, final
ShardingSphereTable table) {
- ShardingSphereSchemaDataAlteredEvent result = new
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName,
tableData.getName());
- Map<String, ShardingSphereRowData> tableDataMap =
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
- Map<String, ShardingSphereRowData> changedTableDataMap =
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
- YamlShardingSphereRowDataSwapper swapper = new
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
- for (Entry<String, ShardingSphereRowData> entry :
changedTableDataMap.entrySet()) {
- if (!tableDataMap.containsKey(entry.getKey())) {
-
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
- } else if
(!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
-
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
- }
- }
- for (Entry<String, ShardingSphereRowData> entry :
tableDataMap.entrySet()) {
- if (!changedTableDataMap.containsKey(entry.getKey())) {
-
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
- }
- }
- return result;
- }
- }
-}
diff --git
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectSchedulerTest.java
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
similarity index 87%
rename from
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectSchedulerTest.java
rename to
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
index ff235fc24de..a0fbd8bd74f 100644
---
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectSchedulerTest.java
+++
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.schedule.core.job.statistics.collect;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationProperties;
+import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
@@ -28,7 +30,8 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.schedule.core.job.statistics.collect.StatisticsCollectScheduler.StatisticsCollectRunnable;
+import org.apache.shardingsphere.test.util.PropertiesBuilder;
+import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
import org.junit.jupiter.api.Test;
import java.sql.Types;
@@ -42,7 +45,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-class StatisticsCollectSchedulerTest {
+class StatisticsCollectJobTest {
@Test
void assertCollect() {
@@ -52,7 +55,9 @@ class StatisticsCollectSchedulerTest {
ShardingSphereMetaData metaData = mockMetaData();
when(contextManager.getMetaDataContexts().getMetaData()).thenReturn(metaData);
when(contextManager.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
- new StatisticsCollectRunnable(contextManager).run();
+
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
TemporaryConfigurationProperties(
+ PropertiesBuilder.build(new
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
Boolean.TRUE.toString()))));
+ new StatisticsCollectJob(contextManager).execute(null);
verify(contextManager).getInstanceContext();
}
diff --git
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
index c9292d19ccc..d2c81f0b2d8 100644
---
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
+++
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.schedule.core.job.statistics.collect;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
@@ -34,7 +35,8 @@ import java.util.Optional;
public final class StatisticsCollectorFixture implements
ShardingSphereStatisticsCollector {
@Override
- public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases) throws SQLException {
+ public Optional<ShardingSphereTableData> collect(final String
databaseName, final ShardingSphereTable table, final Map<String,
ShardingSphereDatabase> databases,
+ final RuleMetaData
globalRuleMetaData) throws SQLException {
ShardingSphereTableData shardingSphereTableData = new
ShardingSphereTableData("test_table");
shardingSphereTableData.getRows().add(new
ShardingSphereRowData(Arrays.asList("1", "2")));
return Optional.of(shardingSphereTableData);
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 08f29095024..c77239724a2 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -100,7 +100,7 @@ public final class RegistryCenter {
new ClusterStatusSubscriber(repository, eventBusContext);
new StorageNodeStatusSubscriber(repository, eventBusContext);
new ClusterProcessSubscriber(repository, eventBusContext);
- new ShardingSphereSchemaDataRegistrySubscriber(repository,
globalLockPersistService, eventBusContext);
+ new ShardingSphereSchemaDataRegistrySubscriber(repository,
eventBusContext);
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 102ae795959..7efc3f24008 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -18,11 +18,8 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.lock.GlobalLockNames;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
-import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -34,11 +31,8 @@ public final class
ShardingSphereSchemaDataRegistrySubscriber {
private final ShardingSphereDataPersistService persistService;
- private final GlobalLockPersistService lockPersistService;
-
- public ShardingSphereSchemaDataRegistrySubscriber(final
ClusterPersistRepository repository, final GlobalLockPersistService
globalLockPersistService, final EventBusContext eventBusContext) {
+ public ShardingSphereSchemaDataRegistrySubscriber(final
ClusterPersistRepository repository, final EventBusContext eventBusContext) {
persistService = new ShardingSphereDataPersistService(repository);
- lockPersistService = globalLockPersistService;
eventBusContext.register(this);
}
@@ -51,15 +45,8 @@ public final class
ShardingSphereSchemaDataRegistrySubscriber {
public void update(final ShardingSphereSchemaDataAlteredEvent event) {
String databaseName = event.getDatabaseName();
String schemaName = event.getSchemaName();
- GlobalLockDefinition lockDefinition = new
GlobalLockDefinition(String.format(GlobalLockNames.STATISTICS.getLockName(),
event.getDatabaseName(), event.getSchemaName(), event.getTableName()));
- if (lockPersistService.tryLock(lockDefinition, 10_000)) {
- try {
-
persistService.getTableRowDataPersistService().persist(databaseName,
schemaName, event.getTableName(), event.getAddedRows());
-
persistService.getTableRowDataPersistService().persist(databaseName,
schemaName, event.getTableName(), event.getUpdatedRows());
-
persistService.getTableRowDataPersistService().delete(databaseName, schemaName,
event.getTableName(), event.getDeletedRows());
- } finally {
- lockPersistService.unlock(lockDefinition);
- }
- }
+ persistService.getTableRowDataPersistService().persist(databaseName,
schemaName, event.getTableName(), event.getAddedRows());
+ persistService.getTableRowDataPersistService().persist(databaseName,
schemaName, event.getTableName(), event.getUpdatedRows());
+ persistService.getTableRowDataPersistService().delete(databaseName,
schemaName, event.getTableName(), event.getDeletedRows());
}
}
diff --git
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
index 2c2aedda48b..62e0365d2da 100644
---
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
+++
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/cases/assertion/IntegrationTestCase.java
@@ -50,6 +50,9 @@ public final class IntegrationTestCase {
@XmlAttribute(name = "adapters")
private String adapters;
+ @XmlAttribute(name = "delay-assertion-seconds")
+ private Integer delayAssertionSeconds;
+
@XmlElement(name = "assertion")
private Collection<IntegrationTestCaseAssertion> assertions = new
LinkedList<>();
}
diff --git
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
index eace11a53b8..0f05d683f3a 100644
---
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
+++
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/engine/type/dql/GeneralDQLE2EIT.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.test.e2e.framework.param.model.AssertionTestPar
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import javax.xml.bind.JAXBException;
import java.io.IOException;
@@ -36,6 +37,8 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -52,6 +55,9 @@ class GeneralDQLE2EIT extends BaseDQLE2EIT {
}
SingleE2EContainerComposer containerComposer = new
SingleE2EContainerComposer(testParam);
init(testParam, containerComposer);
+ if (null !=
testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds()) {
+
Awaitility.await().atMost(Duration.ofMinutes(5)).pollDelay(testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds(),
TimeUnit.SECONDS).until(() -> true);
+ }
if (isUseXMLAsExpectedDataset()) {
assertExecuteQueryWithXmlExpected(testParam, containerComposer);
} else {
@@ -123,6 +129,9 @@ class GeneralDQLE2EIT extends BaseDQLE2EIT {
}
SingleE2EContainerComposer containerComposer = new
SingleE2EContainerComposer(testParam);
init(testParam, containerComposer);
+ if (null !=
testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds()) {
+
Awaitility.await().atMost(Duration.ofMinutes(5)).pollDelay(testParam.getTestCaseContext().getTestCase().getDelayAssertionSeconds(),
TimeUnit.SECONDS).until(() -> true);
+ }
if (isUseXMLAsExpectedDataset()) {
assertExecuteWithXmlExpected(testParam, containerComposer);
} else {
diff --git
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
index 709fd068fd0..f192d8bfa19 100644
---
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
+++
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-order-by.xml
@@ -207,7 +207,7 @@
<assertion expected-data-source-name="expected_dataset" />
</test-case>
- <test-case sql="select * from shardingsphere.sharding_table_statistics
order by id;" db-types="MySQL,PostgreSQL,openGauss" scenario-types="db">
+ <test-case sql="select * from shardingsphere.sharding_table_statistics
order by id;" db-types="MySQL,PostgreSQL,openGauss" scenario-types="db"
delay-assertion-seconds="40">
<assertion expected-data-file="select_sharding_table_statistics.xml" />
</test-case>
</integration-test-cases>