This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 50dd8bf96e6 Migrate ElasticJob governance center interface to 
ShardingSphere governance center (#21395)
50dd8bf96e6 is described below

commit 50dd8bf96e6658a5ce6134ade030393a8208a07b
Author: zhaojinchao <[email protected]>
AuthorDate: Sat Oct 8 23:07:13 2022 +0800

    Migrate ElasticJob governance center interface to ShardingSphere governance 
center (#21395)
    
    * Migrate ElasticJob governance center interface to ShardingSphere 
governance center
    
    * Fix checkstyle
    
    * Revise server.yaml
---
 .../fixture/FixtureClusterPersistRepository.java   | 52 +++++++++++++
 .../mode/persist/PersistRepository.java            | 16 ++++
 .../fixture/ClusterPersistRepositoryFixture.java   | 52 +++++++++++++
 ...ProcessListClusterPersistRepositoryFixture.java | 52 +++++++++++++
 .../cluster/ClusterPersistRepository.java          | 65 ++++++++++++++++
 .../cluster/transaction/TransactionOperation.java  | 88 ++++++++++++++++++++++
 .../cluster/consul/ConsulRepository.java           | 53 +++++++++++++
 .../repository/cluster/etcd/EtcdRepository.java    | 53 +++++++++++++
 .../repository/cluster/nacos/NacosRepository.java  | 79 +++++++++++++++----
 .../zookeeper/CuratorZookeeperRepository.java      | 72 +++++++++++++++++-
 .../StandalonePersistRepositoryFixture.java        | 10 +++
 .../repository/standalone/jdbc/JDBCRepository.java | 13 +++-
 .../fixture/ClusterPersistRepositoryFixture.java   | 52 +++++++++++++
 .../fixture/TestClusterPersistRepository.java      | 52 +++++++++++++
 14 files changed, 691 insertions(+), 18 deletions(-)

diff --git 
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
 
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index 222737a3cab..0fde811f0cc 100644
--- 
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++ 
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -17,10 +17,12 @@
 
 package org.apache.shardingsphere.spring.namespace.fixture;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import org.apache.shardingsphere.infra.database.DefaultDatabase;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -36,6 +38,36 @@ public final class FixtureClusterPersistRepository 
implements ClusterPersistRepo
         registryData.put("/metadata", DefaultDatabase.LOGIC_NAME);
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+    
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+    
+    }
+    
     @Override
     public String get(final String key) {
         return registryData.get(key);
@@ -46,11 +78,21 @@ public final class FixtureClusterPersistRepository 
implements ClusterPersistRepo
         return registryData.containsKey(key) ? 
Collections.singletonList(registryData.get(key)) : Collections.emptyList();
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         registryData.put(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+    
+    }
+    
     @Override
     public void persistEphemeral(final String key, final String value) {
         registryData.put(key, value);
@@ -64,6 +106,16 @@ public final class FixtureClusterPersistRepository 
implements ClusterPersistRepo
     public void delete(final String key) {
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return null;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
index ca4e25cd29b..48c00e5559b 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
@@ -49,6 +49,14 @@ public interface PersistRepository extends TypedSPI {
      */
     List<String> getChildrenKeys(String key);
     
+    /**
+     * Judge node is exist or not.
+     *
+     * @param key key
+     * @return node is exist or not
+     */
+    boolean isExisted(String key);
+    
     /**
      * Persist data.
      *
@@ -57,6 +65,14 @@ public interface PersistRepository extends TypedSPI {
      */
     void persist(String key, String value);
     
+    /**
+     * Update data.
+     *
+     * @param key key
+     * @param value value
+     */
+    void update(String key, String value);
+    
     /**
      * Delete node.
      *
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 79687d599bf..fde68735270 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -17,9 +17,11 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collections;
 import java.util.List;
@@ -30,6 +32,36 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void init(final ClusterPersistRepositoryConfiguration config) {
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+    
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+    
+    }
+    
     @Override
     public String get(final String key) {
         return "";
@@ -40,10 +72,20 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
         return Collections.emptyList();
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
     }
     
+    @Override
+    public void update(final String key, final String value) {
+    
+    }
+    
     @Override
     public void persistEphemeral(final String key, final String value) {
     }
@@ -56,6 +98,16 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void delete(final String key) {
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return null;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index 67af7529f9a..beb4a5ff946 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -17,9 +17,11 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -34,6 +36,36 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
     public void init(final ClusterPersistRepositoryConfiguration config) {
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+    
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+    
+    }
+    
     @Override
     public String get(final String key) {
         return REGISTRY_DATA.get(key);
@@ -44,11 +76,21 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
         return Collections.singletonList("db");
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         REGISTRY_DATA.put(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+    
+    }
+    
     @Override
     public void persistEphemeral(final String key, final String value) {
         REGISTRY_DATA.put(key, value);
@@ -63,6 +105,16 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
         REGISTRY_DATA.remove(key);
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return null;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
diff --git 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index e16b021b6b7..9a5f899910b 100644
--- 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -17,8 +17,12 @@
 
 package org.apache.shardingsphere.mode.repository.cluster;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
+
+import java.util.List;
 
 /**
  * Cluster persist repository.
@@ -32,6 +36,67 @@ public interface ClusterPersistRepository extends 
PersistRepository {
      */
     void init(ClusterPersistRepositoryConfiguration config);
     
+    /**
+     * Get current time from registry center.
+     *
+     * @param key key
+     * @return current time from registry center
+     */
+    long getRegistryCenterTime(String key);
+    
+    /**
+     * Get raw client for registry center client.
+     **
+     * @return registry center raw client
+     */
+    Object getRawClient();
+    
+    /**
+     * Get children number.
+     *
+     * @param key key
+     * @return children number
+     */
+    int getNumChildren(String key);
+    
+    /**
+     * Add data to cache.
+     *
+     * @param cachePath cache path
+     */
+    void addCacheData(String cachePath);
+    
+    /**
+     * Evict data from cache.
+     *
+     * @param cachePath cache path
+     */
+    void evictCacheData(String cachePath);
+    
+    /**
+     * Get raw cache object of registry center.
+     *
+     * @param cachePath cache path
+     * @return raw cache object of registry center
+     */
+    Object getRawCache(String cachePath);
+    
+    /**
+     * Execute in leader.
+     *
+     * @param key key
+     * @param callback callback of leader
+     */
+    void executeInLeader(String key, LeaderExecutionCallback callback);
+    
+    /**
+     * Execute oprations in transaction.
+     *
+     * @param transactionOperations operations
+     * @throws Exception exception
+     */
+    void executeInTransaction(List<TransactionOperation> 
transactionOperations) throws Exception;
+    
     /**
      * Persist ephemeral data.
      *
diff --git 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/transaction/TransactionOperation.java
 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/transaction/TransactionOperation.java
new file mode 100644
index 00000000000..db2c0d90293
--- /dev/null
+++ 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/transaction/TransactionOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.transaction;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@ToString
+public final class TransactionOperation {
+    
+    public enum Type {
+        
+        CHECK_EXISTS,
+        
+        ADD,
+        
+        UPDATE,
+        
+        DELETE
+    }
+    
+    private final Type type;
+    
+    private final String key;
+    
+    private final String value;
+    
+    /**
+     * Operation add.
+     *
+     * @param key key
+     * @param value value
+     * @return TransactionOperation
+     */
+    public static TransactionOperation opAdd(final String key, final String 
value) {
+        return new TransactionOperation(Type.ADD, key, value);
+    }
+    
+    /**
+     * Operation update.
+     *
+     * @param key key
+     * @param value value
+     * @return TransactionOperation
+     */
+    public static TransactionOperation opUpdate(final String key, final String 
value) {
+        return new TransactionOperation(Type.UPDATE, key, value);
+    }
+    
+    /**
+     * Operation delete.
+     *
+     * @param key key
+     * @return TransactionOperation
+     */
+    public static TransactionOperation opDelete(final String key) {
+        return new TransactionOperation(Type.DELETE, key, null);
+    }
+    
+    /**
+     * Operation check exists.
+     *
+     * @param key key
+     * @return TransactionOperation
+     */
+    public static TransactionOperation opCheckExists(final String key) {
+        return new TransactionOperation(Type.CHECK_EXISTS, key, null);
+    }
+}
diff --git 
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
 
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 5d226a4f077..c649a861b57 100644
--- 
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ 
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -24,6 +24,7 @@ import com.ecwid.consul.v1.kv.model.GetValue;
 import com.ecwid.consul.v1.kv.model.PutParams;
 import com.ecwid.consul.v1.session.model.NewSession;
 import com.ecwid.consul.v1.session.model.Session;
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulInternalLockProvider;
@@ -31,6 +32,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProp
 import 
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -61,6 +63,37 @@ public class ConsulRepository implements 
ClusterPersistRepository {
         watchKeyMap = new HashMap<>(6, 1);
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+        // TODO
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+        // TODO
+    }
+    
     @Override
     public String get(final String key) {
         Response<GetValue> response = consulClient.getKVValue(key);
@@ -73,16 +106,36 @@ public class ConsulRepository implements 
ClusterPersistRepository {
         return null == response ? Collections.emptyList() : 
response.getValue();
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         consulClient.setKVValue(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+        // TODO
+    }
+    
     @Override
     public void delete(final String key) {
         consulClient.deleteKVValue(key);
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return consulClient;
+    }
+    
     @Override
     public void close() {
         // TODO
diff --git 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index edc1be95483..301d235a863 100644
--- 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -31,6 +31,7 @@ import io.etcd.jetcd.support.Observers;
 import io.etcd.jetcd.watch.WatchEvent;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.etcd.lock.EtcdInternalLockProvider;
@@ -39,6 +40,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperty
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -67,6 +69,37 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
         etcdInternalLockHolder = new EtcdInternalLockProvider(client, 
etcdProps);
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+        // TODO
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+        // TODO
+    }
+    
     @SneakyThrows({InterruptedException.class, ExecutionException.class})
     @Override
     public String get(final String key) {
@@ -84,6 +117,11 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
         return keyValues.stream().map(each -> getSubNodeKeyName(prefix, 
each.getKey().toString(StandardCharsets.UTF_8))).distinct().collect(Collectors.toList());
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     private String getSubNodeKeyName(final String prefix, final String 
fullPath) {
         String pathWithoutPrefix = fullPath.substring(prefix.length());
         return pathWithoutPrefix.contains(PATH_SEPARATOR) ? 
pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : 
pathWithoutPrefix;
@@ -96,6 +134,11 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
         client.getKVClient().put(ByteSequence.from(key, 
StandardCharsets.UTF_8), ByteSequence.from(value, 
StandardCharsets.UTF_8)).get();
     }
     
+    @Override
+    public void update(final String key, final String value) {
+        // TODO
+    }
+    
     @SneakyThrows({InterruptedException.class, ExecutionException.class})
     @Override
     public void persistEphemeral(final String key, final String value) {
@@ -130,6 +173,16 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
         client.getKVClient().delete(ByteSequence.from(key, 
StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(true).build());
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return client;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
dataChangedEventListener) {
         Watch.Listener listener = Watch.listener(response -> {
diff --git 
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
 
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
index 479c750a81d..d9bb04f0977 100644
--- 
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
+++ 
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
@@ -27,6 +27,7 @@ import com.alibaba.nacos.common.utils.StringUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import org.apache.shardingsphere.infra.instance.utils.IpUtils;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -39,6 +40,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.nacos.listener.NamingEv
 import 
org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey;
 import 
org.apache.shardingsphere.mode.repository.cluster.nacos.utils.NacosMetaDataUtil;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collection;
 import java.util.Comparator;
@@ -73,6 +75,37 @@ public final class NacosRepository implements 
ClusterPersistRepository {
         initServiceMetadata();
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+        // TODO
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+        // TODO
+    }
+    
     private NamingService createClient(final 
ClusterPersistRepositoryConfiguration config) {
         Properties props = new Properties();
         props.setProperty("serverAddr", config.getServerLists());
@@ -190,6 +223,11 @@ public final class NacosRepository implements 
ClusterPersistRepository {
         }
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         try {
@@ -205,6 +243,24 @@ public final class NacosRepository implements 
ClusterPersistRepository {
         }
     }
     
+    @Override
+    public void update(final String key, final String value) {
+        // TODO
+    }
+    
+    private void update(final Instance instance, final String value) throws 
NacosException {
+        Map<String, String> metadataMap = instance.getMetadata();
+        String key = NacosMetaDataUtil.getKey(instance);
+        metadataMap.put(key, value);
+        metadataMap.put(NacosMetaDataUtil.UTC_ZONE_OFFSET.toString(), 
String.valueOf(NacosMetaDataUtil.getTimestamp()));
+        instance.setMetadata(metadataMap);
+        ServiceMetadata persistentService = 
serviceController.getPersistentService();
+        client.registerInstance(persistentService.getServiceName(), instance);
+        Collection<KeyValue> keyValues = new LinkedList<>();
+        keyValues.add(new KeyValue(key, value, instance.isEphemeral()));
+        waitValue(keyValues);
+    }
+    
     private void put(final String key, final String value, final boolean 
ephemeral) throws NacosException {
         final Collection<KeyValue> keyValues = buildParentPath(key);
         ServiceMetadata serviceMetadata = 
serviceController.getService(ephemeral);
@@ -262,19 +318,6 @@ public final class NacosRepository implements 
ClusterPersistRepository {
         metadataMap.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, 
String.valueOf(timeToLiveSeconds * 1000));
     }
     
-    private void update(final Instance instance, final String value) throws 
NacosException {
-        Map<String, String> metadataMap = instance.getMetadata();
-        String key = NacosMetaDataUtil.getKey(instance);
-        metadataMap.put(key, value);
-        metadataMap.put(NacosMetaDataUtil.UTC_ZONE_OFFSET.toString(), 
String.valueOf(NacosMetaDataUtil.getTimestamp()));
-        instance.setMetadata(metadataMap);
-        ServiceMetadata persistentService = 
serviceController.getPersistentService();
-        client.registerInstance(persistentService.getServiceName(), instance);
-        Collection<KeyValue> keyValues = new LinkedList<>();
-        keyValues.add(new KeyValue(key, value, instance.isEphemeral()));
-        waitValue(keyValues);
-    }
-    
     @Override
     public void delete(final String key) {
         try {
@@ -297,6 +340,16 @@ public final class NacosRepository implements 
ClusterPersistRepository {
         }
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return client;
+    }
+    
     private Collection<Instance> findExistedInstance(final String key, final 
boolean ephemeral) throws NacosException {
         return 
client.getAllInstances(serviceController.getService(ephemeral).getServiceName(),
 false).stream()
                 .filter(each -> Objects.equals(key, 
NacosMetaDataUtil.getKey(each))).collect(Collectors.toList());
diff --git 
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
index 3c37d9a97f9..169a566fe11 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -27,6 +28,8 @@ import 
org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
+import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -35,6 +38,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersis
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.CuratorZookeeperExceptionHandler;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperInternalLockProvider;
@@ -45,6 +49,7 @@ import 
org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.KeeperException.OperationTimeoutException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -122,6 +127,47 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
         }
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        try {
+            Stat stat = client.checkExists().forPath(key);
+            if (null != stat) {
+                return stat.getNumChildren();
+            }
+            //CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            //CHECKSTYLE:ON
+            CuratorZookeeperExceptionHandler.handleException(ex);
+        }
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+        // TODO
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        // TODO
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+        // TODO
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+        // TODO
+    }
+    
     @Override
     public String get(final String key) {
         return getDirectly(key);
@@ -156,7 +202,8 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
         }
     }
     
-    private void update(final String key, final String value) {
+    @Override
+    public void update(final String key, final String value) {
         try {
             client.setData().forPath(key, 
value.getBytes(StandardCharsets.UTF_8));
             // CHECKSTYLE:OFF
@@ -177,7 +224,8 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
         }
     }
     
-    private boolean isExisted(final String key) {
+    @Override
+    public boolean isExisted(final String key) {
         try {
             return null != client.checkExists().forPath(key);
             // CHECKSTYLE:OFF
@@ -229,6 +277,26 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
         }
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        long result = 0L;
+        try {
+            persist(key, "");
+            result = client.checkExists().forPath(key).getMtime();
+            //CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            //CHECKSTYLE:ON
+            RegExceptionHandler.handleException(ex);
+        }
+        Preconditions.checkState(0L != result, "Cannot get registry center 
time.");
+        return result;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return client;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
         CuratorCache cache = caches.get(key);
diff --git 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
index d5bd4b2e0a3..d66dac72a1e 100644
--- 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
+++ 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
@@ -52,11 +52,21 @@ public final class StandalonePersistRepositoryFixture 
implements StandalonePersi
         return result;
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         persistMap.put(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+    
+    }
+    
     @Override
     public void delete(final String key) {
     }
diff --git 
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
 
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
index f7cc5ec7ebb..952bb9b8ffd 100644
--- 
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
+++ 
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
@@ -115,11 +115,15 @@ public final class JDBCRepository implements 
StandalonePersistRepository {
         return Collections.emptyList();
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return !Strings.isNullOrEmpty(get(key));
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         try {
-            String oldValue = get(key);
-            if (!Strings.isNullOrEmpty(oldValue)) {
+            if (isExisted(key)) {
                 update(key, value);
                 return;
             }
@@ -157,13 +161,16 @@ public final class JDBCRepository implements 
StandalonePersistRepository {
         }
     }
     
-    private void update(final String key, final String value) throws 
SQLException {
+    @Override
+    public void update(final String key, final String value) {
         try (
                 Connection connection = hikariDataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(provider.updateSQL())) {
             preparedStatement.setString(1, value);
             preparedStatement.setString(2, key);
             preparedStatement.executeUpdate();
+        } catch (SQLException ex) {
+            log.error("Update {} data to key: {} failed", getType(), key, ex);
         }
     }
     
diff --git 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index 1e3f4931d14..a0b2c322ccf 100644
--- 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -17,9 +17,11 @@
 
 package org.apache.shardingsphere.proxy.fixture;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -34,6 +36,36 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void init(final ClusterPersistRepositoryConfiguration config) {
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+    
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+    
+    }
+    
     @Override
     public String get(final String key) {
         return REGISTRY_DATA.get(key);
@@ -44,11 +76,21 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
         return Collections.singletonList("db");
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         REGISTRY_DATA.put(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+    
+    }
+    
     @Override
     public void persistEphemeral(final String key, final String value) {
         REGISTRY_DATA.put(key, value);
@@ -62,6 +104,16 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void delete(final String key) {
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return null;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
diff --git 
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
 
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index 8d30cc2646d..c82c0c0732a 100644
--- 
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++ 
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -17,10 +17,12 @@
 
 package org.apache.shardingsphere.driver.fixture;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
 import org.apache.shardingsphere.infra.database.DefaultDatabase;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -36,6 +38,36 @@ public final class TestClusterPersistRepository implements 
ClusterPersistReposit
         registryData.put("/metadata", DefaultDatabase.LOGIC_NAME);
     }
     
+    @Override
+    public int getNumChildren(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public void addCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public void evictCacheData(final String cachePath) {
+    
+    }
+    
+    @Override
+    public Object getRawCache(final String cachePath) {
+        return null;
+    }
+    
+    @Override
+    public void executeInLeader(final String key, final 
LeaderExecutionCallback callback) {
+    
+    }
+    
+    @Override
+    public void executeInTransaction(final List<TransactionOperation> 
transactionOperations) throws Exception {
+    
+    }
+    
     @Override
     public String get(final String key) {
         return registryData.get(key);
@@ -46,11 +78,21 @@ public final class TestClusterPersistRepository implements 
ClusterPersistReposit
         return registryData.containsKey(key) ? 
Collections.singletonList(registryData.get(key)) : Collections.emptyList();
     }
     
+    @Override
+    public boolean isExisted(final String key) {
+        return false;
+    }
+    
     @Override
     public void persist(final String key, final String value) {
         registryData.put(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+    
+    }
+    
     @Override
     public void persistEphemeral(final String key, final String value) {
         registryData.put(key, value);
@@ -64,6 +106,16 @@ public final class TestClusterPersistRepository implements 
ClusterPersistReposit
     public void delete(final String key) {
     }
     
+    @Override
+    public long getRegistryCenterTime(final String key) {
+        return 0;
+    }
+    
+    @Override
+    public Object getRawClient() {
+        return null;
+    }
+    
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }


Reply via email to