This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 33ec6a8adb4 Switch meta data active version in transaction (#30167)
33ec6a8adb4 is described below
commit 33ec6a8adb49f2d15c9b96764511a161cc90feb1
Author: zhaojinchao <[email protected]>
AuthorDate: Sun Feb 18 16:40:47 2024 +0800
Switch meta data active version in transaction (#30167)
* Switch meta data active version in transaction
* Fix checkstyle
* Update delete node path
* Enhance logic
* Fix build op list
---
.../version/MetaDataVersionPersistService.java | 40 +++++++++++--
.../mode/identifier/NodePathTransactionAware.java | 33 +++++++++++
.../identifier/NodePathTransactionOperation.java | 66 ++++++++++++++++++++++
.../cluster/zookeeper/ZookeeperRepository.java | 40 ++++++++++++-
4 files changed, 173 insertions(+), 6 deletions(-)
diff --git
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
index e029370ffb6..7902ee961e9 100644
---
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
@@ -20,9 +20,13 @@ package
org.apache.shardingsphere.metadata.persist.service.version;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
import org.apache.shardingsphere.metadata.persist.node.NewDatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
import org.apache.shardingsphere.mode.spi.PersistRepository;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
/**
* Meta data version persist service.
@@ -30,21 +34,47 @@ import java.util.Collection;
@RequiredArgsConstructor
public final class MetaDataVersionPersistService implements
MetaDataVersionBasedPersistService {
- private static final String ACTIVE_VERSION = "active_version";
+ private static final String ACTIVE_VERSION = "/active_version";
- private static final String VERSIONS = "versions";
+ private static final String VERSIONS = "/versions/";
private final PersistRepository repository;
- // TODO Need to use transaction operation
@Override
public void switchActiveVersion(final Collection<MetaDataVersion>
metaDataVersions) {
+ if (repository instanceof NodePathTransactionAware) {
+ switchActiveVersionWithTransaction(metaDataVersions);
+ } else {
+ switchActiveVersionWithoutTransaction(metaDataVersions);
+ }
+ }
+
+ private void switchActiveVersionWithTransaction(final
Collection<MetaDataVersion> metaDataVersions) {
+ List<NodePathTransactionOperation> nodePathTransactionOperations =
buildNodePathTransactionOperations(metaDataVersions);
+ if (!nodePathTransactionOperations.isEmpty()) {
+ ((NodePathTransactionAware)
repository).executeInTransaction(nodePathTransactionOperations);
+ }
+ }
+
+ private List<NodePathTransactionOperation>
buildNodePathTransactionOperations(final Collection<MetaDataVersion>
metaDataVersions) {
+ List<NodePathTransactionOperation> result = new ArrayList<>();
+ for (MetaDataVersion each : metaDataVersions) {
+ if
(each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
+ continue;
+ }
+ result.add(NodePathTransactionOperation.update(each.getKey() +
ACTIVE_VERSION, each.getNextActiveVersion()));
+ result.add(NodePathTransactionOperation.delete(each.getKey() +
VERSIONS + each.getCurrentActiveVersion()));
+ }
+ return result;
+ }
+
+ private void switchActiveVersionWithoutTransaction(final
Collection<MetaDataVersion> metaDataVersions) {
for (MetaDataVersion each : metaDataVersions) {
if
(each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
continue;
}
- repository.persist(each.getKey() + "/" + ACTIVE_VERSION,
each.getNextActiveVersion());
- repository.delete(String.join("/", each.getKey(), VERSIONS,
each.getCurrentActiveVersion()));
+ repository.persist(each.getKey() + ACTIVE_VERSION,
each.getNextActiveVersion());
+ repository.delete(each.getKey() + VERSIONS +
each.getCurrentActiveVersion());
}
}
diff --git
a/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java
new file mode 100644
index 00000000000..4b4931c60b3
--- /dev/null
+++
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.identifier;
+
+import java.util.List;
+
+/**
+ * Node path transaction aware.
+ */
+public interface NodePathTransactionAware {
+
+ /**
+ * Execute operations in transaction.
+ *
+ * @param nodePathTransactionOperations node path transaction operations
+ */
+ void executeInTransaction(List<NodePathTransactionOperation>
nodePathTransactionOperations);
+}
diff --git
a/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java
new file mode 100644
index 00000000000..dbe25f38406
--- /dev/null
+++
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.identifier;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Node path transaction operation.
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+public final class NodePathTransactionOperation {
+
+ public enum Type {
+
+ ADD,
+
+ UPDATE,
+
+ DELETE
+ }
+
+ private final Type type;
+
+ private final String key;
+
+ private final String value;
+
+ /**
+ * Update.
+ *
+ * @param key key
+ * @param value value
+ * @return NodePathTransactionOperation node path transaction operation
+ */
+ public static NodePathTransactionOperation update(final String key, final
String value) {
+ return new NodePathTransactionOperation(Type.UPDATE, key, value);
+ }
+
+ /**
+ * Delete.
+ *
+ * @param key key
+ * @return NodePathTransactionOperation node path transaction operation
+ */
+ public static NodePathTransactionOperation delete(final String key) {
+ return new NodePathTransactionOperation(Type.DELETE, key, null);
+ }
+}
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 44845d3f89a..eeede26ef2c 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -23,6 +23,8 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
@@ -30,6 +32,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
@@ -37,6 +40,7 @@ import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
@@ -52,13 +56,14 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* Registry repository of ZooKeeper.
*/
-public final class ZookeeperRepository implements ClusterPersistRepository,
InstanceContextAware {
+public final class ZookeeperRepository implements ClusterPersistRepository,
InstanceContextAware, NodePathTransactionAware {
private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
@@ -288,6 +293,39 @@ public final class ZookeeperRepository implements
ClusterPersistRepository, Inst
client.getConnectionStateListenable().addListener(new
SessionConnectionListener(instanceContext, this));
}
+ @Override
+ public void executeInTransaction(final List<NodePathTransactionOperation>
nodePathTransactionOperations) {
+ try {
+
client.transaction().forOperations(buildCuratorOps(nodePathTransactionOperations));
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ ZookeeperExceptionHandler.handleException(ex);
+ }
+ }
+
+ private List<CuratorOp> buildCuratorOps(final
List<NodePathTransactionOperation> nodePathTransactionOperations) throws
Exception {
+ List<CuratorOp> result = new
ArrayList<>(nodePathTransactionOperations.size());
+ TransactionOp transactionOp = client.transactionOp();
+ for (NodePathTransactionOperation each :
nodePathTransactionOperations) {
+ result.add(buildCuratorOp(each, transactionOp));
+ }
+ return result;
+ }
+
+ private CuratorOp buildCuratorOp(final NodePathTransactionOperation each,
final TransactionOp transactionOp) throws Exception {
+ switch (each.getType()) {
+ case ADD:
+ return transactionOp.create().forPath(each.getKey(),
each.getValue().getBytes(StandardCharsets.UTF_8));
+ case UPDATE:
+ return transactionOp.setData().forPath(each.getKey(),
each.getValue().getBytes(StandardCharsets.UTF_8));
+ case DELETE:
+ return transactionOp.delete().forPath(each.getKey());
+ default:
+ throw new UnsupportedOperationException(each.toString());
+ }
+ }
+
@Override
public String getType() {
return "ZooKeeper";