This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3d565f9c0c6 Make sys_data each table row saving a z-node. (#22539)
3d565f9c0c6 is described below
commit 3d565f9c0c64bf357ff30118c52a3faae3d9b5ab
Author: Chuxin Chen <[email protected]>
AuthorDate: Thu Dec 1 10:29:41 2022 +0800
Make sys_data each table row saving a z-node. (#22539)
---
.../config/props/ConfigurationPropertyKey.java | 5 ----
.../infra/metadata/data/ShardingSphereRowData.java | 29 +++++++++++++++++-
.../pojo/YamlShardingSpherePartitionRowData.java | 34 ----------------------
.../yaml/data/pojo/YamlShardingSphereRowData.java | 2 ++
.../data/pojo/YamlShardingSphereTableData.java | 4 +--
.../swapper/YamlShardingSphereRowDataSwapper.java | 3 +-
.../YamlShardingSphereTableDataSwapper.java | 32 ++++----------------
.../ShardingSphereDataScheduleCollector.java | 4 +--
.../mode/manager/ContextManager.java | 12 ++++----
.../data/ShardingSphereDataPersistService.java | 24 +++++++--------
.../persist/node/ShardingSphereDataNode.java | 2 +-
.../cluster/ClusterContextManagerBuilder.java | 4 +--
.../data/ShardingSphereDataChangedWatcher.java | 10 +++----
.../event/ShardingSphereRowDataAddedEvent.java | 4 +--
.../subscriber/DatabaseChangedSubscriber.java | 2 +-
15 files changed, 66 insertions(+), 105 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
index 2b265797308..f4aac463282 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
@@ -124,11 +124,6 @@ public enum ConfigurationPropertyKey implements
TypedPropertyKey {
*/
PROXY_INSTANCE_TYPE("proxy-instance-type", "Proxy", String.class, true),
- /**
- * Metadata chunk unit rows.
- */
- METADATA_CHUNK_UNIT_ROWS("metadata-chunk-unit-rows", "100", int.class,
false),
-
/**
* Proxy metadata collector enabled.
*/
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
index 17c610d637a..063deeb6aee 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
@@ -20,7 +20,11 @@ package org.apache.shardingsphere.infra.metadata.data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
import java.util.List;
/**
@@ -28,8 +32,31 @@ import java.util.List;
*/
@RequiredArgsConstructor
@Getter
-@EqualsAndHashCode
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public final class ShardingSphereRowData {
+ @EqualsAndHashCode.Include
+ private final String uniqueKey;
+
private final List<Object> rows;
+
+ public ShardingSphereRowData(final List<Object> rows) {
+ uniqueKey = generateUniqueKey(rows);
+ this.rows = rows;
+ }
+
+ private String generateUniqueKey(final List<Object> rows) {
+ StringBuilder uniqueKeyText = new StringBuilder();
+ for (Object each : rows) {
+ uniqueKeyText.append(null == each ? "" :
each.toString()).append("|");
+ }
+ return useMd5GenerateUniqueKey(uniqueKeyText);
+ }
+
+ @SneakyThrows
+ private String useMd5GenerateUniqueKey(final StringBuilder uniqueKeyText) {
+ MessageDigest md5 = MessageDigest.getInstance("MD5");
+ md5.update(StandardCharsets.UTF_8.encode(uniqueKeyText.toString()));
+ return String.format("%032x", new BigInteger(1, md5.digest()));
+ }
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
deleted file mode 100644
index cb9bed40390..00000000000
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.yaml.data.pojo;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-
-import java.util.Collection;
-
-/**
- * Yaml ShardingSphere partition row data.
- */
-@Getter
-@Setter
-public final class YamlShardingSpherePartitionRowData implements
YamlConfiguration {
-
- private Collection<YamlShardingSphereRowData> partitionRows;
-}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
index 57254f18135..0152a06b5b1 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
@@ -30,5 +30,7 @@ import java.util.List;
@Setter
public final class YamlShardingSphereRowData implements YamlConfiguration {
+ private String uniqueKey;
+
private List<Object> rows;
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index 3746e975121..b2a316df847 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -22,8 +22,8 @@ import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import
org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
/**
* ShardingSphere table data.
@@ -36,5 +36,5 @@ public final class YamlShardingSphereTableData implements
YamlConfiguration {
private List<YamlShardingSphereColumn> columns;
- private Map<Integer, YamlShardingSpherePartitionRowData> partitionRows;
+ private Collection<YamlShardingSphereRowData> rowData;
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
index 70863bc4c49..73c0d78e096 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
@@ -48,6 +48,7 @@ public final class YamlShardingSphereRowDataSwapper
implements YamlConfiguration
yamlRowData.add(convertDataType(each,
columns.get(count++).getDataType()));
}
result.setRows(yamlRowData);
+ result.setUniqueKey(data.getUniqueKey());
return result;
}
@@ -68,7 +69,7 @@ public final class YamlShardingSphereRowDataSwapper
implements YamlConfiguration
ShardingSphereColumn column = columns.get(count++);
rowData.add(convertByDataType(each, column.getDataType()));
}
- return new ShardingSphereRowData(rowData);
+ return new ShardingSphereRowData(yamlConfig.getUniqueKey(), rowData);
}
private Object convertByDataType(final Object data, final int dataType) {
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
index cad63d68001..f2eb5dbc659 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -17,49 +17,29 @@
package org.apache.shardingsphere.infra.yaml.data.swapper;
-import com.google.common.collect.Lists;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
-import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import
org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
import java.util.Collection;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
/**
* YAML ShardingSphere data swapper.
*/
-@RequiredArgsConstructor
public final class YamlShardingSphereTableDataSwapper implements
YamlConfigurationSwapper<YamlShardingSphereTableData, ShardingSphereTableData> {
- private final int rowsPartitionSize;
-
- public YamlShardingSphereTableDataSwapper() {
- this(100);
- }
-
@Override
public YamlShardingSphereTableData swapToYamlConfiguration(final
ShardingSphereTableData data) {
YamlShardingSphereTableData result = new YamlShardingSphereTableData();
result.setName(data.getName());
- Map<Integer, YamlShardingSpherePartitionRowData> yamlPartitionRows =
new LinkedHashMap<>();
- int i = 0;
- for (List<ShardingSphereRowData> each :
Lists.partition(data.getRows(), rowsPartitionSize)) {
- Collection<YamlShardingSphereRowData> yamlShardingSphereRowData =
new LinkedList<>();
- each.forEach(rowData -> yamlShardingSphereRowData.add(new
YamlShardingSphereRowDataSwapper(data.getColumns()).swapToYamlConfiguration(rowData)));
- YamlShardingSpherePartitionRowData partitionRowsData = new
YamlShardingSpherePartitionRowData();
- partitionRowsData.setPartitionRows(yamlShardingSphereRowData);
- yamlPartitionRows.put(i++, partitionRowsData);
- }
- result.setPartitionRows(yamlPartitionRows);
+ Collection<YamlShardingSphereRowData> yamlShardingSphereRowData = new
LinkedList<>();
+ data.getRows().forEach(rowData -> yamlShardingSphereRowData.add(new
YamlShardingSphereRowDataSwapper(data.getColumns()).swapToYamlConfiguration(rowData)));
+ result.setRowData(yamlShardingSphereRowData);
List<YamlShardingSphereColumn> columns = new LinkedList<>();
data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
result.setColumns(columns);
@@ -84,10 +64,8 @@ public final class YamlShardingSphereTableDataSwapper
implements YamlConfigurati
yamlConfig.getColumns().forEach(each ->
columns.add(swapColumn(each)));
}
ShardingSphereTableData result = new
ShardingSphereTableData(yamlConfig.getName(), columns);
- if (null != yamlConfig.getPartitionRows()) {
- for (YamlShardingSpherePartitionRowData each :
yamlConfig.getPartitionRows().values()) {
- each.getPartitionRows().forEach(yamlRowData ->
result.getRows().add(new
YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData)));
- }
+ if (null != yamlConfig.getRowData()) {
+ yamlConfig.getRowData().forEach(yamlRowData ->
result.getRows().add(new
YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData)));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index 2c268f2fcda..e1478237ef8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
@@ -121,8 +120,7 @@ public final class ShardingSphereDataScheduleCollector {
}
shardingSphereData.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
changedTableData);
ShardingSphereSchemaDataAlteredEvent event = new
ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName);
- event.getAlteredYamlTables().add(new
YamlShardingSphereTableDataSwapper(contextManager.getMetaDataContexts().getMetaData().getProps()
-
.getValue(ConfigurationPropertyKey.METADATA_CHUNK_UNIT_ROWS)).swapToYamlConfiguration(changedTableData));
+ event.getAlteredYamlTables().add(new
YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(changedTableData));
contextManager.getInstanceContext().getEventBusContext().post(event);
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 67aa1486bc3..7753a6d4898 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -693,19 +693,17 @@ public final class ContextManager implements
AutoCloseable {
* @param tableName table name
* @param yamlRowData yaml row data
*/
- public synchronized void alterRowsData(final String databaseName, final
String schemaName, final String tableName, final
Collection<YamlShardingSphereRowData> yamlRowData) {
+ public synchronized void alterRowsData(final String databaseName, final
String schemaName, final String tableName, final YamlShardingSphereRowData
yamlRowData) {
if
(!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
||
!metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
||
!metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName))
{
return;
}
ShardingSphereTableData tableData =
metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
- Collection<ShardingSphereRowData> rowData =
yamlRowData.stream().map(each -> new
YamlShardingSphereRowDataSwapper(tableData.getColumns()).swapToObject(each)).collect(Collectors.toList());
- rowData.forEach(each -> {
- if (!tableData.getRows().contains(each)) {
- tableData.getRows().add(each);
- }
- });
+ ShardingSphereRowData rowData = new
YamlShardingSphereRowDataSwapper(tableData.getColumns()).swapToObject(yamlRowData);
+ if (!tableData.getRows().contains(rowData)) {
+ tableData.getRows().add(rowData);
+ }
}
@Override
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index 5cb55c5e880..db30839438b 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -23,16 +23,15 @@ import
org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
+import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
import
org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.Collection;
-import java.util.Map;
+import java.util.LinkedList;
import java.util.Optional;
-import java.util.TreeMap;
/**
* ShardingSphere data persist service.
@@ -93,12 +92,14 @@ public final class ShardingSphereDataPersistService {
private ShardingSphereTableData loadTableData(final String databaseName,
final String schemaName, final String tableName) {
String tableData =
repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName,
schemaName, tableName));
YamlShardingSphereTableData yamlTableData =
YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class);
- Map<Integer, YamlShardingSpherePartitionRowData> partitionRowsData =
new TreeMap<>();
+ Collection<YamlShardingSphereRowData> yamlRowData = new LinkedList<>();
for (String each :
repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName,
schemaName, tableName))) {
- String partitionRows =
repository.getDirectly(ShardingSphereDataNode.getTablePartitionRowsPath(databaseName,
schemaName, tableName, each));
- partitionRowsData.put(Integer.parseInt(each),
YamlEngine.unmarshal(partitionRows, YamlShardingSpherePartitionRowData.class));
+ String yamlRow =
repository.getDirectly(ShardingSphereDataNode.getTablePartitionRowsPath(databaseName,
schemaName, tableName, each));
+ if (null != yamlRow) {
+ yamlRowData.add(YamlEngine.unmarshal(yamlRow,
YamlShardingSphereRowData.class));
+ }
}
- yamlTableData.setPartitionRows(partitionRowsData);
+ yamlTableData.setRowData(yamlRowData);
return new
YamlShardingSphereTableDataSwapper().swapToObject(yamlTableData);
}
@@ -108,13 +109,12 @@ public final class ShardingSphereDataPersistService {
* @param databaseName database name
* @param schemaName schema name
* @param schemaData schema data
- * @param rowsPartitionSize rows partition size
*/
- public void persist(final String databaseName, final String schemaName,
final ShardingSphereSchemaData schemaData, final int rowsPartitionSize) {
+ public void persist(final String databaseName, final String schemaName,
final ShardingSphereSchemaData schemaData) {
if (schemaData.getTableData().isEmpty()) {
repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName,
schemaName), "");
} else {
- schemaData.getTableData().values().forEach(each ->
persistTable(databaseName, schemaName, new
YamlShardingSphereTableDataSwapper(rowsPartitionSize).swapToYamlConfiguration(each)));
+ schemaData.getTableData().values().forEach(each ->
persistTable(databaseName, schemaName, new
YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each)));
}
}
@@ -131,7 +131,7 @@ public final class ShardingSphereDataPersistService {
yamlTableDataWithoutRows.setName(table.getName());
yamlTableDataWithoutRows.setColumns(table.getColumns());
repository.persist(ShardingSphereDataNode.getTablePath(databaseName,
schemaName, table.getName().toLowerCase()),
YamlEngine.marshal(yamlTableDataWithoutRows));
- table.getPartitionRows().forEach((key, value) ->
repository.persist(ShardingSphereDataNode
- .getTablePartitionRowsPath(databaseName, schemaName,
table.getName().toLowerCase(), String.valueOf(key)),
YamlEngine.marshal(value)));
+ table.getRowData().forEach(each ->
repository.persist(ShardingSphereDataNode
+ .getTablePartitionRowsPath(databaseName, schemaName,
table.getName().toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
index ee82e3bf9ae..1df75676313 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -190,7 +190,7 @@ public final class ShardingSphereDataNode {
* @return is matched
*/
public static boolean isTableRowDataMatched(final String path) {
- Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() +
"/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)" + "/(\\d+)$",
Pattern.CASE_INSENSITIVE);
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() +
"/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)" + "/(\\w+)$",
Pattern.CASE_INSENSITIVE);
return pattern.matcher(path).find();
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index de7c2eb2cd8..78c346f6f73 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.mode.manager.cluster;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
@@ -77,10 +76,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void persistMetaData(final MetaDataContexts metaDataContexts) {
metaDataContexts.getMetaData().getDatabases().values().forEach(each ->
each.getSchemas()
.forEach((schemaName, schema) ->
metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(),
schemaName, schema)));
- int rowsPartitionSize =
metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.METADATA_CHUNK_UNIT_ROWS);
for (Entry<String, ShardingSphereDatabaseData> entry :
metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
entry.getValue().getSchemaData().forEach((schemaName, schemaData)
-> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
- .persist(entry.getKey(), schemaName, schemaData,
rowsPartitionSize));
+ .persist(entry.getKey(), schemaName, schemaData));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index db733b4ed32..bb2e6c47faf 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
+import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import
org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
@@ -66,7 +66,7 @@ public final class ShardingSphereDataChangedWatcher
implements GovernanceWatcher
return createSchemaDataChangedEvent(event);
}
if (isTableRowDataChanged(event)) {
- return createPartitionRowsAddedEvent(event);
+ return createRowAddedEvent(event);
}
return Optional.empty();
}
@@ -133,7 +133,7 @@ public final class ShardingSphereDataChangedWatcher
implements GovernanceWatcher
.swapToObject(YamlEngine.unmarshal(event.getValue(),
YamlShardingSphereTableData.class)), null);
}
- private Optional<GovernanceEvent> createPartitionRowsAddedEvent(final
DataChangedEvent event) {
+ private Optional<GovernanceEvent> createRowAddedEvent(final
DataChangedEvent event) {
if (Type.ADDED != event.getType()) {
return Optional.empty();
}
@@ -143,7 +143,7 @@ public final class ShardingSphereDataChangedWatcher
implements GovernanceWatcher
Preconditions.checkState(schemaName.isPresent());
Optional<String> tableName =
ShardingSphereDataNode.getTableNameByPartitionRowsPath(event.getKey());
Preconditions.checkState(tableName.isPresent());
- YamlShardingSpherePartitionRowData yamlShardingSpherePartitionRowData
= YamlEngine.unmarshal(event.getValue(),
YamlShardingSpherePartitionRowData.class);
- return Optional.of(new
ShardingSphereRowDataAddedEvent(databaseName.get(), schemaName.get(),
tableName.get(), yamlShardingSpherePartitionRowData.getPartitionRows()));
+ YamlShardingSphereRowData yamlShardingSphereRowData =
YamlEngine.unmarshal(event.getValue(), YamlShardingSphereRowData.class);
+ return Optional.of(new
ShardingSphereRowDataAddedEvent(databaseName.get(), schemaName.get(),
tableName.get(), yamlShardingSphereRowData));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
index 1b54847639a..d4bdb451379 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
@@ -22,8 +22,6 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import java.util.Collection;
-
/**
* Row data added event.
*/
@@ -37,5 +35,5 @@ public final class ShardingSphereRowDataAddedEvent implements
GovernanceEvent {
private final String tableName;
- private final Collection<YamlShardingSphereRowData> yamlRowData;
+ private final YamlShardingSphereRowData yamlRowData;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
index 362a79c6786..549cdd844db 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -92,7 +92,7 @@ public final class DatabaseChangedSubscriber {
/**
* Renew ShardingSphere data of row.
- *
+ *
* @param event ShardingSphere row data added event
*/
@Subscribe