This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d339cdcbb05 Refactor load data source properties for
NewDataSourcePersistService (#26417)
d339cdcbb05 is described below
commit d339cdcbb054a8c2ae6c6166fe5562a8bf411cf5
Author: zhaojinchao <[email protected]>
AuthorDate: Sun Jun 18 21:43:06 2023 +0800
Refactor load data source properties for NewDataSourcePersistService
(#26417)
---
.../database/NewDataSourcePersistService.java | 17 ++++-----
.../event/datasource/AlterStorageUnitEvent.java | 6 ++--
.../event/datasource/RegisterStorageUnitEvent.java | 2 +-
.../datasource/UnregisterStorageUnitEvent.java | 4 +++
.../mode/manager/ContextManager.java | 41 +++++++++++++++++++++-
.../manager/switcher/NewResourceSwitchManager.java | 28 +++++++++++++--
.../watcher/NewMetaDataChangedWatcher.java | 9 +++--
.../subscriber/NewDataSourceChangedSubscriber.java | 33 ++++++++++++++++-
8 files changed, 117 insertions(+), 23 deletions(-)
diff --git
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/config/database/NewDataSourcePersistService.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/config/database/NewDataSourcePersistService.java
index 841ce7ddae1..c0b4f750c5d 100644
---
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/config/database/NewDataSourcePersistService.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/config/database/NewDataSourcePersistService.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.metadata.persist.service.config.database;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
@@ -46,7 +45,7 @@ public final class NewDataSourcePersistService implements
DatabaseBasedPersistSe
@Override
public void persist(final String databaseName, final Map<String,
DataSourceProperties> dataSourceConfigs) {
for (Entry<String, DataSourceProperties> entry :
dataSourceConfigs.entrySet()) {
- String activeVersion = getDatabaseActiveVersion(databaseName,
entry.getKey());
+ String activeVersion = getDataSourceActiveVersion(databaseName,
entry.getKey());
if (Strings.isNullOrEmpty(activeVersion)) {
repository.persist(NewDatabaseMetaDataNode.getDataSourceActiveVersionNode(databaseName,
entry.getKey()), DEFAULT_VERSION);
}
@@ -62,24 +61,20 @@ public final class NewDataSourcePersistService implements
DatabaseBasedPersistSe
public Map<String, DataSourceProperties> load(final String databaseName) {
Map<String, DataSourceProperties> result = new LinkedHashMap<>();
for (String each :
repository.getChildrenKeys(NewDatabaseMetaDataNode.getDataSourcesNode(databaseName)))
{
- result.put(each, getDataSourceProps(databaseName, each));
+ String dataSourceValue =
repository.getDirectly(NewDatabaseMetaDataNode.getDataSourceNode(databaseName,
each, getDataSourceActiveVersion(databaseName, each)));
+ if (Strings.isNullOrEmpty(dataSourceValue)) {
+ result.put(each, YamlEngine.unmarshal(dataSourceValue,
DataSourceProperties.class));
+ }
}
return result;
}
- // TODO Remove this
@Override
public Map<String, DataSourceProperties> load(final String databaseName,
final String version) {
return Collections.emptyMap();
}
- private DataSourceProperties getDataSourceProps(final String databaseName,
final String dataSourceName) {
- String result =
repository.getDirectly(NewDatabaseMetaDataNode.getDataSourceNode(databaseName,
getDatabaseActiveVersion(databaseName, dataSourceName), dataSourceName));
- Preconditions.checkState(!Strings.isNullOrEmpty(result), "Not found
`%s` data source config in `%s` database", dataSourceName, databaseName);
- return YamlEngine.unmarshal(result, DataSourceProperties.class);
- }
-
- private String getDatabaseActiveVersion(final String databaseName, final
String dataSourceName) {
+ private String getDataSourceActiveVersion(final String databaseName, final
String dataSourceName) {
return
repository.getDirectly(NewDatabaseMetaDataNode.getDataSourceActiveVersionNode(databaseName,
dataSourceName));
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/AlterStorageUnitEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/AlterStorageUnitEvent.java
index 44279ff1325..a2ae0660d82 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/AlterStorageUnitEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/AlterStorageUnitEvent.java
@@ -33,7 +33,9 @@ public final class AlterStorageUnitEvent implements
GovernanceEvent {
private final String storageUnitName;
- private final String version;
+ private final DataSourceProperties props;
- private final DataSourceProperties dataSourceProps;
+ private final String activeVersionKey;
+
+ private final int version;
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/RegisterStorageUnitEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/RegisterStorageUnitEvent.java
index 3f0b0fad49b..7b55f07c1b2 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/RegisterStorageUnitEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/RegisterStorageUnitEvent.java
@@ -37,5 +37,5 @@ public final class RegisterStorageUnitEvent implements
GovernanceEvent {
private final String activeVersionKey;
- private final String version;
+ private final int version;
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/UnregisterStorageUnitEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/UnregisterStorageUnitEvent.java
index 6bc9b5566a3..4198e7647c4 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/UnregisterStorageUnitEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/datasource/UnregisterStorageUnitEvent.java
@@ -31,4 +31,8 @@ public final class UnregisterStorageUnitEvent implements
GovernanceEvent {
private final String databaseName;
private final String storageUnitName;
+
+ private final String activeVersionKey;
+
+ private final int version;
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 459a8bb1af0..76cf9adf4f9 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -261,7 +261,7 @@ public final class ContextManager implements AutoCloseable {
try {
Collection<ResourceHeldRule> staleResourceHeldRules =
getStaleResourceHeldRules(databaseName);
staleResourceHeldRules.forEach(ResourceHeldRule::closeStaleResource);
- SwitchingResource switchingResource = new
NewResourceSwitchManager().create(metaDataContexts.get().getMetaData().getDatabase(databaseName).getResourceMetaData(),
+ SwitchingResource switchingResource = new
NewResourceSwitchManager().registerStorageUnit(metaDataContexts.get().getMetaData().getDatabase(databaseName).getResourceMetaData(),
storageUnitName, dataSourceProps);
buildNewMetaDataContext(databaseName, switchingResource);
} catch (final SQLException ex) {
@@ -269,6 +269,45 @@ public final class ContextManager implements AutoCloseable
{
}
}
+ /**
+ * Alter storage unit.
+ *
+ * @param databaseName database name
+ * @param storageUnitName storage unit name
+ * @param dataSourceProps data source properties
+ */
+ @SuppressWarnings("rawtypes")
+ public synchronized void alterStorageUnit(final String databaseName, final
String storageUnitName, final DataSourceProperties dataSourceProps) {
+ try {
+ Collection<ResourceHeldRule> staleResourceHeldRules =
getStaleResourceHeldRules(databaseName);
+
staleResourceHeldRules.forEach(ResourceHeldRule::closeStaleResource);
+ SwitchingResource switchingResource = new
NewResourceSwitchManager().alterStorageUnit(metaDataContexts.get().getMetaData().getDatabase(databaseName).getResourceMetaData(),
+ storageUnitName, dataSourceProps);
+ buildNewMetaDataContext(databaseName, switchingResource);
+ } catch (final SQLException ex) {
+ log.error("Alter database: {} register storage unit failed",
databaseName, ex);
+ }
+ }
+
+ /**
+ * UnRegister storage unit.
+ *
+ * @param databaseName database name
+ * @param storageUnitName storage unit name
+ */
+ @SuppressWarnings("rawtypes")
+ public synchronized void unregisterStorageUnit(final String databaseName,
final String storageUnitName) {
+ try {
+ Collection<ResourceHeldRule> staleResourceHeldRules =
getStaleResourceHeldRules(databaseName);
+
staleResourceHeldRules.forEach(ResourceHeldRule::closeStaleResource);
+ SwitchingResource switchingResource = new
NewResourceSwitchManager().unregisterStorageUnit(metaDataContexts.get().getMetaData().getDatabase(databaseName).getResourceMetaData(),
+ storageUnitName);
+ buildNewMetaDataContext(databaseName, switchingResource);
+ } catch (final SQLException ex) {
+ log.error("Alter database: {} register storage unit failed",
databaseName, ex);
+ }
+ }
+
private void buildNewMetaDataContext(final String databaseName, final
SwitchingResource switchingResource) throws SQLException {
metaDataContexts.get().getMetaData().getDatabases().putAll(renewDatabase(metaDataContexts.get().getMetaData().getDatabase(databaseName),
switchingResource));
MetaDataContexts reloadMetaDataContexts =
createMetaDataContexts(databaseName, false, switchingResource, null);
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/switcher/NewResourceSwitchManager.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/switcher/NewResourceSwitchManager.java
index 361b43c44e1..e2604201e3d 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/switcher/NewResourceSwitchManager.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/switcher/NewResourceSwitchManager.java
@@ -30,14 +30,38 @@ import java.util.Collections;
public final class NewResourceSwitchManager {
/**
- * Create switching resource.
+ * Register storage unit.
*
* @param resourceMetaData resource meta data
* @param storageUnitName storage unit name
* @param dataSourceProps data source properties
* @return created switching resource
*/
- public SwitchingResource create(final ShardingSphereResourceMetaData
resourceMetaData, final String storageUnitName, final DataSourceProperties
dataSourceProps) {
+ public SwitchingResource registerStorageUnit(final
ShardingSphereResourceMetaData resourceMetaData, final String storageUnitName,
final DataSourceProperties dataSourceProps) {
return new SwitchingResource(resourceMetaData,
DataSourcePoolCreator.create(Collections.singletonMap(storageUnitName,
dataSourceProps)), Collections.emptyMap());
}
+
+ /**
+ * Alter storage unit.
+ *
+ * @param resourceMetaData resource meta data
+ * @param storageUnitName storage unit name
+ * @param dataSourceProps data source properties
+ * @return created switching resource
+ */
+ public SwitchingResource alterStorageUnit(final
ShardingSphereResourceMetaData resourceMetaData, final String storageUnitName,
final DataSourceProperties dataSourceProps) {
+ return new SwitchingResource(resourceMetaData,
DataSourcePoolCreator.create(Collections.singletonMap(storageUnitName,
dataSourceProps)),
+ Collections.singletonMap(storageUnitName,
resourceMetaData.getDataSources().remove(storageUnitName)));
+ }
+
+ /**
+ * Unregister storage unit.
+ *
+ * @param resourceMetaData resource meta data
+ * @param storageUnitName storage unit name
+ * @return created switching resource
+ */
+ public SwitchingResource unregisterStorageUnit(final
ShardingSphereResourceMetaData resourceMetaData, final String storageUnitName) {
+ return new SwitchingResource(resourceMetaData, Collections.emptyMap(),
Collections.singletonMap(storageUnitName,
resourceMetaData.getDataSources().remove(storageUnitName)));
+ }
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
index f18c9e0adc7..ff5551da820 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
@@ -140,16 +140,15 @@ public final class NewMetaDataChangedWatcher implements
NewGovernanceWatcher<Gov
if (!version.isPresent()) {
return Optional.empty();
}
- // TODO Finish event parameter.
if (Type.ADDED == event.getType()) {
return Optional.of(new RegisterStorageUnitEvent(databaseName,
dataSourceName.get(),
- new
YamlDataSourceConfigurationSwapper().swapToDataSourceProperties(YamlEngine.unmarshal(event.getValue(),
Map.class)), version.get(), ""));
+ new
YamlDataSourceConfigurationSwapper().swapToDataSourceProperties(YamlEngine.unmarshal(event.getValue(),
Map.class)), event.getKey(), Integer.parseInt(version.get())));
}
if (Type.UPDATED == event.getType()) {
- return Optional.of(new AlterStorageUnitEvent(databaseName,
dataSourceName.get(), version.get(),
- new
YamlDataSourceConfigurationSwapper().swapToDataSourceProperties(YamlEngine.unmarshal(event.getValue(),
Map.class))));
+ return Optional.of(new AlterStorageUnitEvent(databaseName,
dataSourceName.get(),
+ new
YamlDataSourceConfigurationSwapper().swapToDataSourceProperties(YamlEngine.unmarshal(event.getValue(),
Map.class)), event.getKey(), Integer.parseInt(version.get())));
}
- return Optional.of(new UnregisterStorageUnitEvent(databaseName,
dataSourceName.get()));
+ return Optional.of(new UnregisterStorageUnitEvent(databaseName,
dataSourceName.get(), event.getKey(), Integer.parseInt(version.get())));
}
private Optional<GovernanceEvent> createDatabaseRuleEvent(final String
databaseName, final DataChangedEvent event) {
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewDataSourceChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewDataSourceChangedSubscriber.java
index c30494cc976..71100f34799 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewDataSourceChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewDataSourceChangedSubscriber.java
@@ -18,7 +18,9 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.event.datasource.AlterStorageUnitEvent;
import
org.apache.shardingsphere.mode.event.datasource.RegisterStorageUnitEvent;
+import
org.apache.shardingsphere.mode.event.datasource.UnregisterStorageUnitEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
/**
@@ -36,12 +38,41 @@ public final class NewDataSourceChangedSubscriber {
}
/**
- * Renew for register storage units.
+ * Renew for register storage unit.
*
* @param event register storage unit event
*/
@Subscribe
public void renew(final RegisterStorageUnitEvent event) {
+ if (event.getVersion() <
contextManager.getInstanceContext().getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))
{
+ return;
+ }
contextManager.registerStorageUnit(event.getDatabaseName(),
event.getStorageUnitName(), event.getProps());
}
+
+ /**
+ * Renew for alter storage unit.
+ *
+ * @param event register storage unit event
+ */
+ @Subscribe
+ public void renew(final AlterStorageUnitEvent event) {
+ if (event.getVersion() <
contextManager.getInstanceContext().getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))
{
+ return;
+ }
+ contextManager.alterStorageUnit(event.getDatabaseName(),
event.getStorageUnitName(), event.getProps());
+ }
+
+ /**
+ * Renew for unregister storage unit.
+ *
+ * @param event register storage unit event
+ */
+ @Subscribe
+ public void renew(final UnregisterStorageUnitEvent event) {
+ if (event.getVersion() <
contextManager.getInstanceContext().getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))
{
+ return;
+ }
+ contextManager.unregisterStorageUnit(event.getDatabaseName(),
event.getStorageUnitName());
+ }
}