This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 50dd8bf96e6 Migrate ElasticJob governance center interface to
ShardingSphere governance center (#21395)
50dd8bf96e6 is described below
commit 50dd8bf96e6658a5ce6134ade030393a8208a07b
Author: zhaojinchao <[email protected]>
AuthorDate: Sat Oct 8 23:07:13 2022 +0800
Migrate ElasticJob governance center interface to ShardingSphere governance
center (#21395)
* Migrate ElasticJob governance center interface to ShardingSphere
governance center
* Fix checkstyle
* Revise server.yaml
---
.../fixture/FixtureClusterPersistRepository.java | 52 +++++++++++++
.../mode/persist/PersistRepository.java | 16 ++++
.../fixture/ClusterPersistRepositoryFixture.java | 52 +++++++++++++
...ProcessListClusterPersistRepositoryFixture.java | 52 +++++++++++++
.../cluster/ClusterPersistRepository.java | 65 ++++++++++++++++
.../cluster/transaction/TransactionOperation.java | 88 ++++++++++++++++++++++
.../cluster/consul/ConsulRepository.java | 53 +++++++++++++
.../repository/cluster/etcd/EtcdRepository.java | 53 +++++++++++++
.../repository/cluster/nacos/NacosRepository.java | 79 +++++++++++++++----
.../zookeeper/CuratorZookeeperRepository.java | 72 +++++++++++++++++-
.../StandalonePersistRepositoryFixture.java | 10 +++
.../repository/standalone/jdbc/JDBCRepository.java | 13 +++-
.../fixture/ClusterPersistRepositoryFixture.java | 52 +++++++++++++
.../fixture/TestClusterPersistRepository.java | 52 +++++++++++++
14 files changed, 691 insertions(+), 18 deletions(-)
diff --git
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index 222737a3cab..0fde811f0cc 100644
---
a/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++
b/jdbc/spring/core/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -17,10 +17,12 @@
package org.apache.shardingsphere.spring.namespace.fixture;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import org.apache.shardingsphere.infra.database.DefaultDatabase;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -36,6 +38,36 @@ public final class FixtureClusterPersistRepository
implements ClusterPersistRepo
registryData.put("/metadata", DefaultDatabase.LOGIC_NAME);
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+
+ }
+
@Override
public String get(final String key) {
return registryData.get(key);
@@ -46,11 +78,21 @@ public final class FixtureClusterPersistRepository
implements ClusterPersistRepo
return registryData.containsKey(key) ?
Collections.singletonList(registryData.get(key)) : Collections.emptyList();
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
registryData.put(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+
+ }
+
@Override
public void persistEphemeral(final String key, final String value) {
registryData.put(key, value);
@@ -64,6 +106,16 @@ public final class FixtureClusterPersistRepository
implements ClusterPersistRepo
public void delete(final String key) {
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return null;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
index ca4e25cd29b..48c00e5559b 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/PersistRepository.java
@@ -49,6 +49,14 @@ public interface PersistRepository extends TypedSPI {
*/
List<String> getChildrenKeys(String key);
+ /**
+ * Judge node is exist or not.
+ *
+ * @param key key
+ * @return node is exist or not
+ */
+ boolean isExisted(String key);
+
/**
* Persist data.
*
@@ -57,6 +65,14 @@ public interface PersistRepository extends TypedSPI {
*/
void persist(String key, String value);
+ /**
+ * Update data.
+ *
+ * @param key key
+ * @param value value
+ */
+ void update(String key, String value);
+
/**
* Delete node.
*
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 79687d599bf..fde68735270 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -17,9 +17,11 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collections;
import java.util.List;
@@ -30,6 +32,36 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void init(final ClusterPersistRepositoryConfiguration config) {
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+
+ }
+
@Override
public String get(final String key) {
return "";
@@ -40,10 +72,20 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
return Collections.emptyList();
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
}
+ @Override
+ public void update(final String key, final String value) {
+
+ }
+
@Override
public void persistEphemeral(final String key, final String value) {
}
@@ -56,6 +98,16 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void delete(final String key) {
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return null;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index 67af7529f9a..beb4a5ff946 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -17,9 +17,11 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -34,6 +36,36 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
public void init(final ClusterPersistRepositoryConfiguration config) {
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+
+ }
+
@Override
public String get(final String key) {
return REGISTRY_DATA.get(key);
@@ -44,11 +76,21 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
return Collections.singletonList("db");
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
REGISTRY_DATA.put(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+
+ }
+
@Override
public void persistEphemeral(final String key, final String value) {
REGISTRY_DATA.put(key, value);
@@ -63,6 +105,16 @@ public final class
ProcessListClusterPersistRepositoryFixture implements Cluster
REGISTRY_DATA.remove(key);
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return null;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
}
diff --git
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index e16b021b6b7..9a5f899910b 100644
---
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -17,8 +17,12 @@
package org.apache.shardingsphere.mode.repository.cluster;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
+
+import java.util.List;
/**
* Cluster persist repository.
@@ -32,6 +36,67 @@ public interface ClusterPersistRepository extends
PersistRepository {
*/
void init(ClusterPersistRepositoryConfiguration config);
+ /**
+ * Get current time from registry center.
+ *
+ * @param key key
+ * @return current time from registry center
+ */
+ long getRegistryCenterTime(String key);
+
+ /**
+ * Get raw client for registry center client.
+ **
+ * @return registry center raw client
+ */
+ Object getRawClient();
+
+ /**
+ * Get children number.
+ *
+ * @param key key
+ * @return children number
+ */
+ int getNumChildren(String key);
+
+ /**
+ * Add data to cache.
+ *
+ * @param cachePath cache path
+ */
+ void addCacheData(String cachePath);
+
+ /**
+ * Evict data from cache.
+ *
+ * @param cachePath cache path
+ */
+ void evictCacheData(String cachePath);
+
+ /**
+ * Get raw cache object of registry center.
+ *
+ * @param cachePath cache path
+ * @return raw cache object of registry center
+ */
+ Object getRawCache(String cachePath);
+
+ /**
+ * Execute in leader.
+ *
+ * @param key key
+ * @param callback callback of leader
+ */
+ void executeInLeader(String key, LeaderExecutionCallback callback);
+
+ /**
+ * Execute oprations in transaction.
+ *
+ * @param transactionOperations operations
+ * @throws Exception exception
+ */
+ void executeInTransaction(List<TransactionOperation>
transactionOperations) throws Exception;
+
/**
* Persist ephemeral data.
*
diff --git
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/transaction/TransactionOperation.java
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/transaction/TransactionOperation.java
new file mode 100644
index 00000000000..db2c0d90293
--- /dev/null
+++
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/transaction/TransactionOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.transaction;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@ToString
+public final class TransactionOperation {
+
+ public enum Type {
+
+ CHECK_EXISTS,
+
+ ADD,
+
+ UPDATE,
+
+ DELETE
+ }
+
+ private final Type type;
+
+ private final String key;
+
+ private final String value;
+
+ /**
+ * Operation add.
+ *
+ * @param key key
+ * @param value value
+ * @return TransactionOperation
+ */
+ public static TransactionOperation opAdd(final String key, final String
value) {
+ return new TransactionOperation(Type.ADD, key, value);
+ }
+
+ /**
+ * Operation update.
+ *
+ * @param key key
+ * @param value value
+ * @return TransactionOperation
+ */
+ public static TransactionOperation opUpdate(final String key, final String
value) {
+ return new TransactionOperation(Type.UPDATE, key, value);
+ }
+
+ /**
+ * Operation delete.
+ *
+ * @param key key
+ * @return TransactionOperation
+ */
+ public static TransactionOperation opDelete(final String key) {
+ return new TransactionOperation(Type.DELETE, key, null);
+ }
+
+ /**
+ * Operation check exists.
+ *
+ * @param key key
+ * @return TransactionOperation
+ */
+ public static TransactionOperation opCheckExists(final String key) {
+ return new TransactionOperation(Type.CHECK_EXISTS, key, null);
+ }
+}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 5d226a4f077..c649a861b57 100644
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -24,6 +24,7 @@ import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulInternalLockProvider;
@@ -31,6 +32,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProp
import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collection;
import java.util.Collections;
@@ -61,6 +63,37 @@ public class ConsulRepository implements
ClusterPersistRepository {
watchKeyMap = new HashMap<>(6, 1);
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+ // TODO
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+ // TODO
+ }
+
@Override
public String get(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
@@ -73,16 +106,36 @@ public class ConsulRepository implements
ClusterPersistRepository {
return null == response ? Collections.emptyList() :
response.getValue();
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
consulClient.setKVValue(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+ // TODO
+ }
+
@Override
public void delete(final String key) {
consulClient.deleteKVValue(key);
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return consulClient;
+ }
+
@Override
public void close() {
// TODO
diff --git
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index edc1be95483..301d235a863 100644
---
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -31,6 +31,7 @@ import io.etcd.jetcd.support.Observers;
import io.etcd.jetcd.watch.WatchEvent;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.etcd.lock.EtcdInternalLockProvider;
@@ -39,6 +40,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperty
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -67,6 +69,37 @@ public final class EtcdRepository implements
ClusterPersistRepository {
etcdInternalLockHolder = new EtcdInternalLockProvider(client,
etcdProps);
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+ // TODO
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+ // TODO
+ }
+
@SneakyThrows({InterruptedException.class, ExecutionException.class})
@Override
public String get(final String key) {
@@ -84,6 +117,11 @@ public final class EtcdRepository implements
ClusterPersistRepository {
return keyValues.stream().map(each -> getSubNodeKeyName(prefix,
each.getKey().toString(StandardCharsets.UTF_8))).distinct().collect(Collectors.toList());
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
private String getSubNodeKeyName(final String prefix, final String
fullPath) {
String pathWithoutPrefix = fullPath.substring(prefix.length());
return pathWithoutPrefix.contains(PATH_SEPARATOR) ?
pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) :
pathWithoutPrefix;
@@ -96,6 +134,11 @@ public final class EtcdRepository implements
ClusterPersistRepository {
client.getKVClient().put(ByteSequence.from(key,
StandardCharsets.UTF_8), ByteSequence.from(value,
StandardCharsets.UTF_8)).get();
}
+ @Override
+ public void update(final String key, final String value) {
+ // TODO
+ }
+
@SneakyThrows({InterruptedException.class, ExecutionException.class})
@Override
public void persistEphemeral(final String key, final String value) {
@@ -130,6 +173,16 @@ public final class EtcdRepository implements
ClusterPersistRepository {
client.getKVClient().delete(ByteSequence.from(key,
StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(true).build());
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return client;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
dataChangedEventListener) {
Watch.Listener listener = Watch.listener(response -> {
diff --git
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
index 479c750a81d..d9bb04f0977 100644
---
a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
+++
b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
@@ -27,6 +27,7 @@ import com.alibaba.nacos.common.utils.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.SneakyThrows;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -39,6 +40,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.nacos.listener.NamingEv
import
org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosProperties;
import
org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey;
import
org.apache.shardingsphere.mode.repository.cluster.nacos.utils.NacosMetaDataUtil;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collection;
import java.util.Comparator;
@@ -73,6 +75,37 @@ public final class NacosRepository implements
ClusterPersistRepository {
initServiceMetadata();
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+ // TODO
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+ // TODO
+ }
+
private NamingService createClient(final
ClusterPersistRepositoryConfiguration config) {
Properties props = new Properties();
props.setProperty("serverAddr", config.getServerLists());
@@ -190,6 +223,11 @@ public final class NacosRepository implements
ClusterPersistRepository {
}
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
try {
@@ -205,6 +243,24 @@ public final class NacosRepository implements
ClusterPersistRepository {
}
}
+ @Override
+ public void update(final String key, final String value) {
+ // TODO
+ }
+
+ private void update(final Instance instance, final String value) throws
NacosException {
+ Map<String, String> metadataMap = instance.getMetadata();
+ String key = NacosMetaDataUtil.getKey(instance);
+ metadataMap.put(key, value);
+ metadataMap.put(NacosMetaDataUtil.UTC_ZONE_OFFSET.toString(),
String.valueOf(NacosMetaDataUtil.getTimestamp()));
+ instance.setMetadata(metadataMap);
+ ServiceMetadata persistentService =
serviceController.getPersistentService();
+ client.registerInstance(persistentService.getServiceName(), instance);
+ Collection<KeyValue> keyValues = new LinkedList<>();
+ keyValues.add(new KeyValue(key, value, instance.isEphemeral()));
+ waitValue(keyValues);
+ }
+
private void put(final String key, final String value, final boolean
ephemeral) throws NacosException {
final Collection<KeyValue> keyValues = buildParentPath(key);
ServiceMetadata serviceMetadata =
serviceController.getService(ephemeral);
@@ -262,19 +318,6 @@ public final class NacosRepository implements
ClusterPersistRepository {
metadataMap.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT,
String.valueOf(timeToLiveSeconds * 1000));
}
- private void update(final Instance instance, final String value) throws
NacosException {
- Map<String, String> metadataMap = instance.getMetadata();
- String key = NacosMetaDataUtil.getKey(instance);
- metadataMap.put(key, value);
- metadataMap.put(NacosMetaDataUtil.UTC_ZONE_OFFSET.toString(),
String.valueOf(NacosMetaDataUtil.getTimestamp()));
- instance.setMetadata(metadataMap);
- ServiceMetadata persistentService =
serviceController.getPersistentService();
- client.registerInstance(persistentService.getServiceName(), instance);
- Collection<KeyValue> keyValues = new LinkedList<>();
- keyValues.add(new KeyValue(key, value, instance.isEphemeral()));
- waitValue(keyValues);
- }
-
@Override
public void delete(final String key) {
try {
@@ -297,6 +340,16 @@ public final class NacosRepository implements
ClusterPersistRepository {
}
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return client;
+ }
+
private Collection<Instance> findExistedInstance(final String key, final
boolean ephemeral) throws NacosException {
return
client.getAllInstances(serviceController.getService(ephemeral).getServiceName(),
false).stream()
.filter(each -> Objects.equals(key,
NacosMetaDataUtil.getKey(each))).collect(Collectors.toList());
diff --git
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
index 3c37d9a97f9..169a566fe11 100644
---
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -27,6 +28,8 @@ import
org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
+import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -35,6 +38,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersis
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.CuratorZookeeperExceptionHandler;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperInternalLockProvider;
@@ -45,6 +49,7 @@ import
org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.OperationTimeoutException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -122,6 +127,47 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
}
+ @Override
+ public int getNumChildren(final String key) {
+ try {
+ Stat stat = client.checkExists().forPath(key);
+ if (null != stat) {
+ return stat.getNumChildren();
+ }
+ //CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ //CHECKSTYLE:ON
+ CuratorZookeeperExceptionHandler.handleException(ex);
+ }
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+ // TODO
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+ // TODO
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+ // TODO
+ }
+
@Override
public String get(final String key) {
return getDirectly(key);
@@ -156,7 +202,8 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
}
- private void update(final String key, final String value) {
+ @Override
+ public void update(final String key, final String value) {
try {
client.setData().forPath(key,
value.getBytes(StandardCharsets.UTF_8));
// CHECKSTYLE:OFF
@@ -177,7 +224,8 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
}
- private boolean isExisted(final String key) {
+ @Override
+ public boolean isExisted(final String key) {
try {
return null != client.checkExists().forPath(key);
// CHECKSTYLE:OFF
@@ -229,6 +277,26 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
}
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ long result = 0L;
+ try {
+ persist(key, "");
+ result = client.checkExists().forPath(key).getMtime();
+ //CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ //CHECKSTYLE:ON
+ RegExceptionHandler.handleException(ex);
+ }
+ Preconditions.checkState(0L != result, "Cannot get registry center
time.");
+ return result;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return client;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
CuratorCache cache = caches.get(key);
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
index d5bd4b2e0a3..d66dac72a1e 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/fixture/StandalonePersistRepositoryFixture.java
@@ -52,11 +52,21 @@ public final class StandalonePersistRepositoryFixture
implements StandalonePersi
return result;
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
persistMap.put(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+
+ }
+
@Override
public void delete(final String key) {
}
diff --git
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
index f7cc5ec7ebb..952bb9b8ffd 100644
---
a/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
+++
b/mode/type/standalone/repository/provider/jdbc/core/src/main/java/org/apache/shardingsphere/mode/repository/standalone/jdbc/JDBCRepository.java
@@ -115,11 +115,15 @@ public final class JDBCRepository implements
StandalonePersistRepository {
return Collections.emptyList();
}
+ @Override
+ public boolean isExisted(final String key) {
+ return !Strings.isNullOrEmpty(get(key));
+ }
+
@Override
public void persist(final String key, final String value) {
try {
- String oldValue = get(key);
- if (!Strings.isNullOrEmpty(oldValue)) {
+ if (isExisted(key)) {
update(key, value);
return;
}
@@ -157,13 +161,16 @@ public final class JDBCRepository implements
StandalonePersistRepository {
}
}
- private void update(final String key, final String value) throws
SQLException {
+ @Override
+ public void update(final String key, final String value) {
try (
Connection connection = hikariDataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(provider.updateSQL())) {
preparedStatement.setString(1, value);
preparedStatement.setString(2, key);
preparedStatement.executeUpdate();
+ } catch (SQLException ex) {
+ log.error("Update {} data to key: {} failed", getType(), key, ex);
}
}
diff --git
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index 1e3f4931d14..a0b2c322ccf 100644
---
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -17,9 +17,11 @@
package org.apache.shardingsphere.proxy.fixture;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -34,6 +36,36 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void init(final ClusterPersistRepositoryConfiguration config) {
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+
+ }
+
@Override
public String get(final String key) {
return REGISTRY_DATA.get(key);
@@ -44,11 +76,21 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
return Collections.singletonList("db");
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
REGISTRY_DATA.put(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+
+ }
+
@Override
public void persistEphemeral(final String key, final String value) {
REGISTRY_DATA.put(key, value);
@@ -62,6 +104,16 @@ public final class ClusterPersistRepositoryFixture
implements ClusterPersistRepo
public void delete(final String key) {
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return null;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
}
diff --git
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index 8d30cc2646d..c82c0c0732a 100644
---
a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++
b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -17,10 +17,12 @@
package org.apache.shardingsphere.driver.fixture;
+import
org.apache.shardingsphere.elasticjob.lite.internal.storage.LeaderExecutionCallback;
import org.apache.shardingsphere.infra.database.DefaultDatabase;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import
org.apache.shardingsphere.mode.repository.cluster.transaction.TransactionOperation;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -36,6 +38,36 @@ public final class TestClusterPersistRepository implements
ClusterPersistReposit
registryData.put("/metadata", DefaultDatabase.LOGIC_NAME);
}
+ @Override
+ public int getNumChildren(final String key) {
+ return 0;
+ }
+
+ @Override
+ public void addCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public void evictCacheData(final String cachePath) {
+
+ }
+
+ @Override
+ public Object getRawCache(final String cachePath) {
+ return null;
+ }
+
+ @Override
+ public void executeInLeader(final String key, final
LeaderExecutionCallback callback) {
+
+ }
+
+ @Override
+ public void executeInTransaction(final List<TransactionOperation>
transactionOperations) throws Exception {
+
+ }
+
@Override
public String get(final String key) {
return registryData.get(key);
@@ -46,11 +78,21 @@ public final class TestClusterPersistRepository implements
ClusterPersistReposit
return registryData.containsKey(key) ?
Collections.singletonList(registryData.get(key)) : Collections.emptyList();
}
+ @Override
+ public boolean isExisted(final String key) {
+ return false;
+ }
+
@Override
public void persist(final String key, final String value) {
registryData.put(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+
+ }
+
@Override
public void persistEphemeral(final String key, final String value) {
registryData.put(key, value);
@@ -64,6 +106,16 @@ public final class TestClusterPersistRepository implements
ClusterPersistReposit
public void delete(final String key) {
}
+ @Override
+ public long getRegistryCenterTime(final String key) {
+ return 0;
+ }
+
+ @Override
+ public Object getRawClient() {
+ return null;
+ }
+
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
}