This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c1e8edbb5cc Use PersistServiceFacade instead of
ShardingSphereSchemaDataAlteredEvent and remove
ShardingSphereSchemaDataRegistrySubscriber (#31407)
c1e8edbb5cc is described below
commit c1e8edbb5cc8692eb1be588b429fd9b440e64eab
Author: Haoran Meng <[email protected]>
AuthorDate: Sun May 26 22:53:52 2024 +0800
Use PersistServiceFacade instead of ShardingSphereSchemaDataAlteredEvent
and remove ShardingSphereSchemaDataRegistrySubscriber (#31407)
* Use PersistServiceFacade instead of ShardingSphereSchemaDataAlteredEvent
and remove ShardingSphereSchemaDataRegistrySubscriber
* Use PersistServiceFacade instead of ShardingSphereSchemaDataAlteredEvent
and remove ShardingSphereSchemaDataRegistrySubscriber
---
.../statistics/collect/StatisticsCollectJob.java | 12 +++---
.../collect/StatisticsCollectJobTest.java | 4 +-
.../mode/service/PersistServiceFacade.java | 17 ++++++++
.../pojo/ShardingSphereSchemaDataAlteredPOJO.java} | 7 ++-
.../cluster/ClusterContextManagerBuilder.java | 2 -
...ShardingSphereSchemaDataRegistrySubscriber.java | 50 ----------------------
6 files changed, 29 insertions(+), 63 deletions(-)
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
index effb25b6da1..b32f57030d5 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJob.java
@@ -35,7 +35,7 @@ import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSph
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
+import
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
import java.util.ArrayList;
import java.util.Map;
@@ -134,13 +134,13 @@ public final class StatisticsCollectJob implements
SimpleJob {
return;
}
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
changedTableData);
- ShardingSphereSchemaDataAlteredEvent event =
getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData,
changedTableData, table);
-
contextManager.getComputeNodeInstanceContext().getEventBusContext().post(event);
+ ShardingSphereSchemaDataAlteredPOJO schemaDataAlteredPOJO =
getShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, tableData,
changedTableData, table);
+
contextManager.getPersistServiceFacade().persist(schemaDataAlteredPOJO);
}
- private ShardingSphereSchemaDataAlteredEvent
getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
-
final ShardingSphereTableData changedTableData, final
ShardingSphereTable table) {
- ShardingSphereSchemaDataAlteredEvent result = new
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName,
tableData.getName());
+ private ShardingSphereSchemaDataAlteredPOJO
getShardingSphereSchemaDataAlteredPOJO(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
+
final ShardingSphereTableData changedTableData, final
ShardingSphereTable table) {
+ ShardingSphereSchemaDataAlteredPOJO result = new
ShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName,
tableData.getName());
Map<String, ShardingSphereRowData> tableDataMap =
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
Map<String, ShardingSphereRowData> changedTableDataMap =
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
YamlShardingSphereRowDataSwapper swapper = new
YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
diff --git
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
index c4a9d026d32..5a89037ae27 100644
---
a/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
+++
b/kernel/schedule/core/src/test/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobTest.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
import org.junit.jupiter.api.Test;
@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.Properties;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -58,7 +60,7 @@ class StatisticsCollectJobTest {
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
TemporaryConfigurationProperties(
PropertiesBuilder.build(new
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
Boolean.TRUE.toString()))));
new StatisticsCollectJob(contextManager).execute(null);
- verify(contextManager).getComputeNodeInstanceContext();
+
verify(contextManager.getPersistServiceFacade()).persist(any(ShardingSphereSchemaDataAlteredPOJO.class));
}
private ShardingSphereStatistics mockStatistics() {
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
index 1f06cccb0e7..86153b94280 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.StatePersistService;
@@ -45,4 +46,20 @@ public final class PersistServiceFacade {
statePersistService = new StatePersistService(repository);
metaDataManagerPersistService =
TypedSPILoader.getService(MetaDataManagerPersistServiceBuilder.class,
modeConfiguration.getType()).build(contextManager);
}
+
+ /**
+ * Update when sharding sphere schema data altered.
+ *
+ * @param schemaDataAlteredPOJO sharding sphere schema data
+ */
+ public void persist(final ShardingSphereSchemaDataAlteredPOJO
schemaDataAlteredPOJO) {
+ String databaseName = schemaDataAlteredPOJO.getDatabaseName();
+ String schemaName = schemaDataAlteredPOJO.getSchemaName();
+
metaDataPersistService.getShardingSphereDataPersistService().getTableRowDataPersistService().persist(databaseName,
schemaName, schemaDataAlteredPOJO.getTableName(),
+ schemaDataAlteredPOJO.getAddedRows());
+
metaDataPersistService.getShardingSphereDataPersistService().getTableRowDataPersistService().persist(databaseName,
schemaName, schemaDataAlteredPOJO.getTableName(),
+ schemaDataAlteredPOJO.getUpdatedRows());
+
metaDataPersistService.getShardingSphereDataPersistService().getTableRowDataPersistService().delete(databaseName,
schemaName, schemaDataAlteredPOJO.getTableName(),
+ schemaDataAlteredPOJO.getDeletedRows());
+ }
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereSchemaDataAlteredEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ShardingSphereSchemaDataAlteredPOJO.java
similarity index 84%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereSchemaDataAlteredEvent.java
rename to
mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ShardingSphereSchemaDataAlteredPOJO.java
index aa9ec928581..1762beda0a7 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereSchemaDataAlteredEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/pojo/ShardingSphereSchemaDataAlteredPOJO.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
+package org.apache.shardingsphere.mode.service.pojo;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import java.util.Collection;
import java.util.LinkedList;
/**
- * Schema altered event.
+ * Schema altered pojo.
*/
@RequiredArgsConstructor
@Getter
-public final class ShardingSphereSchemaDataAlteredEvent implements
GovernanceEvent {
+public final class ShardingSphereSchemaDataAlteredPOJO {
private final String databaseName;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index a4d13f555ce..72f4874996c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
@@ -99,7 +98,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void createSubscribers(final EventBusContext eventBusContext,
final ClusterPersistRepository repository) {
eventBusContext.register(new
QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository,
eventBusContext));
- eventBusContext.register(new
ShardingSphereSchemaDataRegistrySubscriber(repository));
}
private void registerOnline(final EventBusContext eventBusContext, final
ComputeNodeInstanceContext computeNodeInstanceContext,
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
deleted file mode 100644
index c7843a645ed..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-/**
- * ShardingSphere schema data registry subscriber.
- */
-public final class ShardingSphereSchemaDataRegistrySubscriber implements
EventSubscriber {
-
- private final ShardingSphereDataPersistService persistService;
-
- public ShardingSphereSchemaDataRegistrySubscriber(final
ClusterPersistRepository repository) {
- persistService = new ShardingSphereDataPersistService(repository);
- }
-
- /**
- * Update when ShardingSphere schema data altered.
- *
- * @param event schema altered event
- */
- @Subscribe
- public void update(final ShardingSphereSchemaDataAlteredEvent event) {
- String databaseName = event.getDatabaseName();
- String schemaName = event.getSchemaName();
- persistService.getTableRowDataPersistService().persist(databaseName,
schemaName, event.getTableName(), event.getAddedRows());
- persistService.getTableRowDataPersistService().persist(databaseName,
schemaName, event.getTableName(), event.getUpdatedRows());
- persistService.getTableRowDataPersistService().delete(databaseName,
schemaName, event.getTableName(), event.getDeletedRows());
- }
-}