This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e5723894331 Refactor statistics collect job (#31989)
e5723894331 is described below
commit e57238943314697e28f80087a261fd030aa35472
Author: jiangML <[email protected]>
AuthorDate: Fri Jul 5 19:28:02 2024 +0800
Refactor statistics collect job (#31989)
---
.../BroadcastInstanceBroadcastRoutingEngine.java | 2 +-
.../shardingsphere/infra/lock/GlobalLockNames.java | 4 +-
.../statistics/collect/StatisticsCollectJob.java | 130 ++-------------------
.../collect/StatisticsCollectJobWorker.java | 24 +++-
.../ShardingSphereStatisticsRefreshEngine.java | 90 +++++++++-----
.../mode/fixture}/StatisticsCollectorFixture.java | 2 +-
.../ShardingSphereStatisticsRefreshEngineTest.java | 13 ++-
...ics.collector.ShardingSphereStatisticsCollector | 2 +-
.../dispatch/ListenerAssistedSubscriber.java | 13 +++
.../ResourceMetaDataChangedSubscriber.java | 18 +++
10 files changed, 136 insertions(+), 162 deletions(-)
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
index ef3768deb64..c88487679da 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
@@ -31,7 +31,7 @@ import java.util.Collections;
* Broadcast routing engine for database instance.
*/
@RequiredArgsConstructor
-public class BroadcastInstanceBroadcastRoutingEngine implements
BroadcastRouteEngine {
+public final class BroadcastInstanceBroadcastRoutingEngine implements
BroadcastRouteEngine {
private final ResourceMetaData resourceMetaData;
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
index 927c98f6d84..24e7da36050 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java
@@ -28,7 +28,9 @@ public enum GlobalLockNames {
PREPARE("prepare_%s"),
- GLOBAL_LOCK("global_clock");
+ GLOBAL_LOCK("global_clock"),
+
+ STATISTICS("statistics");
private final String lockName;
}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
index d272f55041c..4526debbb91 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
@@ -21,28 +21,12 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
-import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
+import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
* Statistics collect job.
@@ -55,107 +39,9 @@ public final class StatisticsCollectJob implements
SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
- try {
- if
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
{
- ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
- ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
- ShardingSphereStatistics changedStatistics = new
ShardingSphereStatistics();
- statistics.getDatabaseData().forEach((key, value) -> {
- if (metaData.containsDatabase(key)) {
- collectForDatabase(key, value,
metaData.getDatabases(), changedStatistics);
- }
- });
- compareUpdateAndSendEvent(statistics, changedStatistics,
metaData.getDatabases());
- }
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("Collect data error", ex);
- }
- }
-
- private void collectForDatabase(final String databaseName, final
ShardingSphereDatabaseData databaseData,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
- databaseData.getSchemaData().forEach((key, value) -> {
- if (databases.get(databaseName.toLowerCase()).containsSchema(key))
{
- collectForSchema(databaseName, key, value, databases,
statistics);
- }
- });
- }
-
- private void collectForSchema(final String databaseName, final String
schemaName, final ShardingSphereSchemaData schemaData,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
- schemaData.getTableData().forEach((key, value) -> {
- if
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
{
- collectForTable(databaseName, schemaName,
databases.get(databaseName).getSchema(schemaName).getTable(key), databases,
statistics);
- }
- });
- }
-
- private void collectForTable(final String databaseName, final String
schemaName, final ShardingSphereTable table,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
- Optional<ShardingSphereStatisticsCollector> dataCollector =
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class,
table.getName());
- if (!dataCollector.isPresent()) {
- return;
- }
- Optional<ShardingSphereTableData> tableData = Optional.empty();
- try {
- tableData = dataCollector.get().collect(databaseName, table,
databases,
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error(String.format("Collect %s.%s.%s data failed",
databaseName, schemaName, table.getName()), ex);
- }
- tableData.ifPresent(optional ->
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key ->
new ShardingSphereDatabaseData())
- .getSchemaData().computeIfAbsent(schemaName, key -> new
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(),
optional));
- }
-
- private void compareUpdateAndSendEvent(final ShardingSphereStatistics
statistics, final ShardingSphereStatistics changedStatistics,
- final Map<String,
ShardingSphereDatabase> databases) {
- changedStatistics.getDatabaseData().forEach((key, value) ->
compareUpdateAndSendEventForDatabase(key,
statistics.getDatabaseData().get(key), value, statistics,
- databases.get(key.toLowerCase())));
- }
-
- private void compareUpdateAndSendEventForDatabase(final String
databaseName, final ShardingSphereDatabaseData databaseData, final
ShardingSphereDatabaseData changedDatabaseData,
- final
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
- changedDatabaseData.getSchemaData().forEach((key, value) ->
compareUpdateAndSendEventForSchema(databaseName, key,
databaseData.getSchemaData().get(key), value, statistics,
- database.getSchema(key)));
- }
-
- private void compareUpdateAndSendEventForSchema(final String databaseName,
final String schemaName, final ShardingSphereSchemaData schemaData,
- final
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics
statistics, final ShardingSphereSchema schema) {
- changedSchemaData.getTableData().forEach((key, value) ->
compareUpdateAndSendEventForTable(databaseName, schemaName,
schemaData.getTableData().get(key), value, statistics,
- schema.getTable(key)));
- }
-
- private void compareUpdateAndSendEventForTable(final String databaseName,
final String schemaName, final ShardingSphereTableData tableData,
- final
ShardingSphereTableData changedTableData, final ShardingSphereStatistics
statistics, final ShardingSphereTable table) {
- if (tableData.equals(changedTableData)) {
- return;
- }
-
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
changedTableData);
- AlteredShardingSphereSchemaData schemaDataAlteredPOJO =
getShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, tableData,
changedTableData, table);
-
contextManager.getPersistServiceFacade().persist(schemaDataAlteredPOJO);
- }
-
- private AlteredShardingSphereSchemaData
getShardingSphereSchemaDataAlteredPOJO(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
-
final ShardingSphereTableData changedTableData, final ShardingSphereTable
table) {
- AlteredShardingSphereSchemaData result = new
AlteredShardingSphereSchemaData(databaseName, schemaName, tableData.getName());
- Map<String, ShardingSphereRowData> tableDataMap =
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
- Map<String, ShardingSphereRowData> changedTableDataMap =
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
- YamlShardingSphereRowDataSwapper swapper = new
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
- for (Entry<String, ShardingSphereRowData> entry :
changedTableDataMap.entrySet()) {
- if (!tableDataMap.containsKey(entry.getKey())) {
-
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
- } else if
(!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
-
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
- }
- }
- for (Entry<String, ShardingSphereRowData> entry :
tableDataMap.entrySet()) {
- if (!changedTableDataMap.containsKey(entry.getKey())) {
-
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
- }
+ PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
+ if (repository instanceof ClusterPersistRepository) {
+ new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).refresh();
}
- return result;
}
}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
index ef12120d4e7..0c8448d4282 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -69,11 +70,32 @@ public final class StatisticsCollectJobWorker {
private static CoordinatorRegistryCenter createRegistryCenter(final
ModeConfiguration modeConfig) {
ClusterPersistRepositoryConfiguration repositoryConfig =
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
String namespace = String.join("/", repositoryConfig.getNamespace(),
ShardingSphereDataNode.getJobPath());
- CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(new
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace));
+ CoordinatorRegistryCenter result = new
ZookeeperRegistryCenter(getZookeeperConfiguration(repositoryConfig, namespace));
result.init();
return result;
}
+ private static ZookeeperConfiguration getZookeeperConfiguration(final
ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace)
{
+ // TODO Merge registry center code in ElasticJob and ShardingSphere
mode; Use SPI to load impl
+ ZookeeperConfiguration result = new
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace);
+ Properties props = repositoryConfig.getProps();
+ int retryIntervalMilliseconds =
props.containsKey("retryIntervalMilliseconds") ? (int)
props.get("retryIntervalMilliseconds") : 500;
+ int maxRetries = props.containsKey("maxRetries") ? (int)
props.get("maxRetries") : 3;
+ result.setBaseSleepTimeMilliseconds(retryIntervalMilliseconds);
+ result.setMaxRetries(maxRetries);
+ result.setMaxSleepTimeMilliseconds(retryIntervalMilliseconds *
maxRetries);
+ int timeToLiveSeconds = props.containsKey("timeToLiveSeconds") ? (int)
props.get("timeToLiveSeconds") : 60;
+ if (0 != timeToLiveSeconds) {
+ result.setSessionTimeoutMilliseconds(timeToLiveSeconds * 1000);
+ }
+ int operationTimeoutMilliseconds =
props.containsKey("operationTimeoutMilliseconds") ? (int)
props.get("operationTimeoutMilliseconds") : 500;
+ if (0 != operationTimeoutMilliseconds) {
+
result.setConnectionTimeoutMilliseconds(operationTimeoutMilliseconds);
+ }
+ result.setDigest(props.getProperty("digest"));
+ return result;
+ }
+
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder(JOB_NAME,
1).cron(CRON_EXPRESSION).overwrite(true).build();
}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
similarity index 73%
copy from
kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
index d272f55041c..0e853cfab52 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+package org.apache.shardingsphere.mode.metadata.refresher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.lock.GlobalLockNames;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -34,6 +34,8 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableDa
import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
+import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;
@@ -41,55 +43,81 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
- * Statistics collect job.
+ * ShardingSphere statistics refresh engine.
*/
@RequiredArgsConstructor
@Slf4j
-public final class StatisticsCollectJob implements SimpleJob {
+public final class ShardingSphereStatisticsRefreshEngine {
+
+ private static final ExecutorService EXECUTOR_SERVICE =
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("statistics-collect-%d"));
private final ContextManager contextManager;
- @Override
- public void execute(final ShardingContext shardingContext) {
+ private final GlobalLockContext globalLockContext;
+
+ /**
+ * Async refresh.
+ */
+ public void asyncRefresh() {
+ EXECUTOR_SERVICE.execute(this::refresh);
+ }
+
+ /**
+ * Refresh.
+ */
+ public void refresh() {
try {
if
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
{
+ collectAndRefresh();
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("Collect data error", ex);
+ }
+ }
+
+ private void collectAndRefresh() {
+ GlobalLockDefinition lockDefinition = new
GlobalLockDefinition(GlobalLockNames.STATISTICS.getLockName());
+ if (globalLockContext.tryLock(lockDefinition, 5000L)) {
+ try {
ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
ShardingSphereStatistics changedStatistics = new
ShardingSphereStatistics();
- statistics.getDatabaseData().forEach((key, value) -> {
- if (metaData.containsDatabase(key)) {
- collectForDatabase(key, value,
metaData.getDatabases(), changedStatistics);
+ for (Entry<String, ShardingSphereDatabaseData> entry :
statistics.getDatabaseData().entrySet()) {
+ if (metaData.containsDatabase(entry.getKey())) {
+ collectForDatabase(entry.getKey(), entry.getValue(),
metaData.getDatabases(), changedStatistics);
}
- });
+ }
compareUpdateAndSendEvent(statistics, changedStatistics,
metaData.getDatabases());
+ } finally {
+ globalLockContext.unlock(lockDefinition);
}
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("Collect data error", ex);
}
}
private void collectForDatabase(final String databaseName, final
ShardingSphereDatabaseData databaseData,
final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
- databaseData.getSchemaData().forEach((key, value) -> {
- if (databases.get(databaseName.toLowerCase()).containsSchema(key))
{
- collectForSchema(databaseName, key, value, databases,
statistics);
+ for (Entry<String, ShardingSphereSchemaData> entry :
databaseData.getSchemaData().entrySet()) {
+ if
(databases.get(databaseName.toLowerCase()).containsSchema(entry.getKey())) {
+ collectForSchema(databaseName, entry.getKey(),
entry.getValue(), databases, statistics);
}
- });
+ }
}
private void collectForSchema(final String databaseName, final String
schemaName, final ShardingSphereSchemaData schemaData,
final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
- schemaData.getTableData().forEach((key, value) -> {
- if
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key))
{
- collectForTable(databaseName, schemaName,
databases.get(databaseName).getSchema(schemaName).getTable(key), databases,
statistics);
+ for (Entry<String, ShardingSphereTableData> entry :
schemaData.getTableData().entrySet()) {
+ if
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(entry.getKey()))
{
+ collectForTable(databaseName, schemaName,
databases.get(databaseName).getSchema(schemaName).getTable(entry.getKey()),
databases, statistics);
}
- });
+ }
}
private void collectForTable(final String databaseName, final String
schemaName, final ShardingSphereTable table,
@@ -106,26 +134,26 @@ public final class StatisticsCollectJob implements
SimpleJob {
// CHECKSTYLE:ON
log.error(String.format("Collect %s.%s.%s data failed",
databaseName, schemaName, table.getName()), ex);
}
- tableData.ifPresent(optional ->
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key ->
new ShardingSphereDatabaseData())
- .getSchemaData().computeIfAbsent(schemaName, key -> new
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(),
optional));
+ tableData.ifPresent(optional ->
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key ->
new ShardingSphereDatabaseData()).getSchemaData()
+ .computeIfAbsent(schemaName, key -> new
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(),
optional));
}
private void compareUpdateAndSendEvent(final ShardingSphereStatistics
statistics, final ShardingSphereStatistics changedStatistics,
final Map<String,
ShardingSphereDatabase> databases) {
- changedStatistics.getDatabaseData().forEach((key, value) ->
compareUpdateAndSendEventForDatabase(key,
statistics.getDatabaseData().get(key), value, statistics,
- databases.get(key.toLowerCase())));
+ changedStatistics.getDatabaseData().forEach(
+ (key, value) -> compareUpdateAndSendEventForDatabase(key,
statistics.getDatabaseData().get(key), value, statistics,
databases.get(key.toLowerCase())));
}
private void compareUpdateAndSendEventForDatabase(final String
databaseName, final ShardingSphereDatabaseData databaseData, final
ShardingSphereDatabaseData changedDatabaseData,
final
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
- changedDatabaseData.getSchemaData().forEach((key, value) ->
compareUpdateAndSendEventForSchema(databaseName, key,
databaseData.getSchemaData().get(key), value, statistics,
- database.getSchema(key)));
+ changedDatabaseData.getSchemaData().forEach(
+ (key, value) ->
compareUpdateAndSendEventForSchema(databaseName, key,
databaseData.getSchemaData().get(key), value, statistics,
database.getSchema(key)));
}
private void compareUpdateAndSendEventForSchema(final String databaseName,
final String schemaName, final ShardingSphereSchemaData schemaData,
final
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics
statistics, final ShardingSphereSchema schema) {
- changedSchemaData.getTableData().forEach((key, value) ->
compareUpdateAndSendEventForTable(databaseName, schemaName,
schemaData.getTableData().get(key), value, statistics,
- schema.getTable(key)));
+ changedSchemaData.getTableData().forEach(
+ (key, value) ->
compareUpdateAndSendEventForTable(databaseName, schemaName,
schemaData.getTableData().get(key), value, statistics, schema.getTable(key)));
}
private void compareUpdateAndSendEventForTable(final String databaseName,
final String schemaName, final ShardingSphereTableData tableData,
diff --git
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/StatisticsCollectorFixture.java
similarity index 96%
rename from
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
rename to
mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/StatisticsCollectorFixture.java
index d2c81f0b2d8..83257c6a5dc 100644
---
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectorFixture.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/StatisticsCollectorFixture.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+package org.apache.shardingsphere.mode.fixture;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
diff --git
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngineTest.java
similarity index 90%
rename from
kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
rename to
mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngineTest.java
index c80faebce8d..c0dd1af7bad 100644
---
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngineTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.schedule.core.job.statistics.collect;
+package org.apache.shardingsphere.mode.metadata.refresher;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationProperties;
@@ -29,6 +29,8 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabas
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
+import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
@@ -42,15 +44,16 @@ import java.util.LinkedList;
import java.util.Properties;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-class StatisticsCollectJobTest {
+class ShardingSphereStatisticsRefreshEngineTest {
@Test
- void assertCollect() {
+ void assertRefresh() {
ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
ShardingSphereStatistics statistics = mockStatistics();
when(contextManager.getMetaDataContexts().getStatistics()).thenReturn(statistics);
@@ -59,7 +62,9 @@ class StatisticsCollectJobTest {
when(contextManager.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
TemporaryConfigurationProperties(
PropertiesBuilder.build(new
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
Boolean.TRUE.toString()))));
- new StatisticsCollectJob(contextManager).execute(null);
+ GlobalLockContext globalLockContext = mock(GlobalLockContext.class);
+ when(globalLockContext.tryLock(any(GlobalLockDefinition.class),
anyLong())).thenReturn(true);
+ new ShardingSphereStatisticsRefreshEngine(contextManager,
globalLockContext).refresh();
verify(contextManager.getPersistServiceFacade()).persist(any(AlteredShardingSphereSchemaData.class));
}
diff --git
a/kernel/schedule/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
b/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
similarity index 89%
rename from
kernel/schedule/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
rename to
mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
index 190b550cc3a..1c364ce3e81 100644
---
a/kernel/schedule/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
+++
b/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.schedule.core.job.statistics.collect.StatisticsCollectorFixture
+org.apache.shardingsphere.mode.fixture.StatisticsCollectorFixture
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index df7ef650b93..59d4be84fba 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -23,10 +23,14 @@ import
org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import
org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.listener.DataChangedEventListenerManager;
import
org.apache.shardingsphere.mode.manager.cluster.listener.MetaDataChangedListener;
+import
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
+import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
* Listener assisted subscriber.
@@ -54,6 +58,7 @@ public final class ListenerAssistedSubscriber implements
EventSubscriber {
new
MetaDataChangedListener(contextManager.getComputeNodeInstanceContext().getEventBusContext()));
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addDatabase(event.getDatabaseName());
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+ refreshShardingSphereStatisticsData();
}
/**
@@ -66,5 +71,13 @@ public final class ListenerAssistedSubscriber implements
EventSubscriber {
listenerManager.removeListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()));
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropDatabase(event.getDatabaseName());
contextManager.getPersistServiceFacade().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+ refreshShardingSphereStatisticsData();
+ }
+
+ private void refreshShardingSphereStatisticsData() {
+ PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
+ if (repository instanceof ClusterPersistRepository) {
+ new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).asyncRefresh();
+ }
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
index 17c92504be8..4516026b532 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
@@ -26,9 +26,14 @@ import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.Creat
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.DropTableEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.CreateOrAlterViewEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.DropViewEvent;
+import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
+import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
import java.util.Map;
@@ -49,6 +54,7 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
@Subscribe
public synchronized void renew(final SchemaAddedEvent event) {
contextManager.getMetaDataContextManager().getResourceMetaDataManager().addSchema(event.getDatabaseName(),
event.getSchemaName());
+ refreshShardingSphereStatisticsData();
}
/**
@@ -59,6 +65,7 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
@Subscribe
public synchronized void renew(final SchemaDeletedEvent event) {
contextManager.getMetaDataContextManager().getResourceMetaDataManager().dropSchema(event.getDatabaseName(),
event.getSchemaName());
+ refreshShardingSphereStatisticsData();
}
/**
@@ -76,6 +83,7 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
.getTableMetaDataPersistService().load(event.getDatabaseName(),
event.getSchemaName(), event.getTableName());
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
event.getSchemaName(),
tables.values().iterator().next(), null);
+ refreshShardingSphereStatisticsData();
}
/**
@@ -86,6 +94,7 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
@Subscribe
public synchronized void renew(final DropTableEvent event) {
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
event.getSchemaName(), event.getTableName(), null);
+ refreshShardingSphereStatisticsData();
}
/**
@@ -103,6 +112,7 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
.getViewMetaDataPersistService().load(event.getDatabaseName(),
event.getSchemaName(), event.getViewName());
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
event.getSchemaName(),
null, views.values().iterator().next());
+ refreshShardingSphereStatisticsData();
}
/**
@@ -113,5 +123,13 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
@Subscribe
public synchronized void renew(final DropViewEvent event) {
contextManager.getMetaDataContextManager().getResourceMetaDataManager().alterSchema(event.getDatabaseName(),
event.getSchemaName(), null, event.getViewName());
+ refreshShardingSphereStatisticsData();
+ }
+
+ private void refreshShardingSphereStatisticsData() {
+ PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
+ if (repository instanceof ClusterPersistRepository) {
+ new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).asyncRefresh();
+ }
}
}