This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 33ec6a8adb4 Switch meta data active version in transaction (#30167)
33ec6a8adb4 is described below

commit 33ec6a8adb49f2d15c9b96764511a161cc90feb1
Author: zhaojinchao <[email protected]>
AuthorDate: Sun Feb 18 16:40:47 2024 +0800

    Switch meta data active version in transaction (#30167)
    
    * Switch meta data active version in transaction
    
    * Fix checkstyle
    
    * Update delete node path
    
    * Enhance logic
    
    * Fix build op list
---
 .../version/MetaDataVersionPersistService.java     | 40 +++++++++++--
 .../mode/identifier/NodePathTransactionAware.java  | 33 +++++++++++
 .../identifier/NodePathTransactionOperation.java   | 66 ++++++++++++++++++++++
 .../cluster/zookeeper/ZookeeperRepository.java     | 40 ++++++++++++-
 4 files changed, 173 insertions(+), 6 deletions(-)

diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
index e029370ffb6..7902ee961e9 100644
--- 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java
@@ -20,9 +20,13 @@ package 
org.apache.shardingsphere.metadata.persist.service.version;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
 import org.apache.shardingsphere.metadata.persist.node.NewDatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Meta data version persist service.
@@ -30,21 +34,47 @@ import java.util.Collection;
 @RequiredArgsConstructor
 public final class MetaDataVersionPersistService implements 
MetaDataVersionBasedPersistService {
     
-    private static final String ACTIVE_VERSION = "active_version";
+    private static final String ACTIVE_VERSION = "/active_version";
     
-    private static final String VERSIONS = "versions";
+    private static final String VERSIONS = "/versions/";
     
     private final PersistRepository repository;
     
-    // TODO Need to use transaction operation
     @Override
     public void switchActiveVersion(final Collection<MetaDataVersion> 
metaDataVersions) {
+        if (repository instanceof NodePathTransactionAware) {
+            switchActiveVersionWithTransaction(metaDataVersions);
+        } else {
+            switchActiveVersionWithoutTransaction(metaDataVersions);
+        }
+    }
+    
+    private void switchActiveVersionWithTransaction(final 
Collection<MetaDataVersion> metaDataVersions) {
+        List<NodePathTransactionOperation> nodePathTransactionOperations = 
buildNodePathTransactionOperations(metaDataVersions);
+        if (!nodePathTransactionOperations.isEmpty()) {
+            ((NodePathTransactionAware) 
repository).executeInTransaction(nodePathTransactionOperations);
+        }
+    }
+    
+    private List<NodePathTransactionOperation> 
buildNodePathTransactionOperations(final Collection<MetaDataVersion> 
metaDataVersions) {
+        List<NodePathTransactionOperation> result = new ArrayList<>();
+        for (MetaDataVersion each : metaDataVersions) {
+            if 
(each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
+                continue;
+            }
+            result.add(NodePathTransactionOperation.update(each.getKey() + 
ACTIVE_VERSION, each.getNextActiveVersion()));
+            result.add(NodePathTransactionOperation.delete(each.getKey() + 
VERSIONS + each.getCurrentActiveVersion()));
+        }
+        return result;
+    }
+    
+    private void switchActiveVersionWithoutTransaction(final 
Collection<MetaDataVersion> metaDataVersions) {
         for (MetaDataVersion each : metaDataVersions) {
             if 
(each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
                 continue;
             }
-            repository.persist(each.getKey() + "/" + ACTIVE_VERSION, 
each.getNextActiveVersion());
-            repository.delete(String.join("/", each.getKey(), VERSIONS, 
each.getCurrentActiveVersion()));
+            repository.persist(each.getKey() + ACTIVE_VERSION, 
each.getNextActiveVersion());
+            repository.delete(each.getKey() + VERSIONS + 
each.getCurrentActiveVersion());
         }
     }
     
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java
new file mode 100644
index 00000000000..4b4931c60b3
--- /dev/null
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.identifier;
+
+import java.util.List;
+
+/**
+ * Node path transaction aware.
+ */
+public interface NodePathTransactionAware {
+    
+    /**
+     * Execute operations in transaction.
+     *
+     * @param nodePathTransactionOperations node path transaction operations
+     */
+    void executeInTransaction(List<NodePathTransactionOperation> 
nodePathTransactionOperations);
+}
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java
new file mode 100644
index 00000000000..dbe25f38406
--- /dev/null
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.identifier;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Node path transaction operation.
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+public final class NodePathTransactionOperation {
+    
+    public enum Type {
+        
+        ADD,
+        
+        UPDATE,
+        
+        DELETE
+    }
+    
+    private final Type type;
+    
+    private final String key;
+    
+    private final String value;
+    
+    /**
+     * Update.
+     *
+     * @param key key
+     * @param value value
+     * @return NodePathTransactionOperation node path transaction operation
+     */
+    public static NodePathTransactionOperation update(final String key, final 
String value) {
+        return new NodePathTransactionOperation(Type.UPDATE, key, value);
+    }
+    
+    /**
+     * Delete.
+     *
+     * @param key key
+     * @return NodePathTransactionOperation node path transaction operation
+     */
+    public static NodePathTransactionOperation delete(final String key) {
+        return new NodePathTransactionOperation(Type.DELETE, key, null);
+    }
+}
diff --git 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 44845d3f89a..eeede26ef2c 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -23,6 +23,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
 import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
 import org.apache.curator.framework.recipes.cache.CuratorCache;
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
@@ -30,6 +32,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
@@ -37,6 +40,7 @@ import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
@@ -52,13 +56,14 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Registry repository of ZooKeeper.
  */
-public final class ZookeeperRepository implements ClusterPersistRepository, 
InstanceContextAware {
+public final class ZookeeperRepository implements ClusterPersistRepository, 
InstanceContextAware, NodePathTransactionAware {
     
     private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
     
@@ -288,6 +293,39 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository, Inst
         client.getConnectionStateListenable().addListener(new 
SessionConnectionListener(instanceContext, this));
     }
     
+    @Override
+    public void executeInTransaction(final List<NodePathTransactionOperation> 
nodePathTransactionOperations) {
+        try {
+            
client.transaction().forOperations(buildCuratorOps(nodePathTransactionOperations));
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            ZookeeperExceptionHandler.handleException(ex);
+        }
+    }
+    
+    private List<CuratorOp> buildCuratorOps(final 
List<NodePathTransactionOperation> nodePathTransactionOperations) throws 
Exception {
+        List<CuratorOp> result = new 
ArrayList<>(nodePathTransactionOperations.size());
+        TransactionOp transactionOp = client.transactionOp();
+        for (NodePathTransactionOperation each : 
nodePathTransactionOperations) {
+            result.add(buildCuratorOp(each, transactionOp));
+        }
+        return result;
+    }
+    
+    private CuratorOp buildCuratorOp(final NodePathTransactionOperation each, 
final TransactionOp transactionOp) throws Exception {
+        switch (each.getType()) {
+            case ADD:
+                return transactionOp.create().forPath(each.getKey(), 
each.getValue().getBytes(StandardCharsets.UTF_8));
+            case UPDATE:
+                return transactionOp.setData().forPath(each.getKey(), 
each.getValue().getBytes(StandardCharsets.UTF_8));
+            case DELETE:
+                return transactionOp.delete().forPath(each.getKey());
+            default:
+                throw new UnsupportedOperationException(each.toString());
+        }
+    }
+    
     @Override
     public String getType() {
         return "ZooKeeper";

Reply via email to