This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb37841283 Allow `ConsulRawClient` of 
`cluster-mode-repository-consul` to be configured on ports other than `8500` 
(#29621)
dcb37841283 is described below

commit dcb37841283b10d399997795b7177eccc5f8e750
Author: Ling Hengqian <[email protected]>
AuthorDate: Tue Jan 2 00:55:54 2024 +0800

    Allow `ConsulRawClient` of `cluster-mode-repository-consul` to be 
configured on ports other than `8500` (#29621)
---
 .../builtin-algorithm/metadata-repository.cn.md    | 10 +++
 .../builtin-algorithm/metadata-repository.en.md    | 10 +++
 .../cluster/consul/ConsulRepository.java           | 72 +++++++++++++++++++---
 .../cluster/consul/ConsulRepositoryTest.java       | 29 +++++++++
 4 files changed, 111 insertions(+), 10 deletions(-)

diff --git 
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
 
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
index 5181f984e9c..a2a290a9575 100644
--- 
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
+++ 
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
@@ -56,6 +56,16 @@ Apache ShardingSphere 为不同的运行模式提供了不同的元数据持久
 
 ### Consul 持久化
 
+受 `com.ecwid.consul:consul-api:1.4.5` 的 Maven 模块的限制,使用者无法通过 gRPC 端口来连接到  
Consul Agent。
+
+`Consul` 实现的 `serverLists` 属性受设计使然,仅可通过 HTTP 端点连接到单个 Consul Agent。
+`serverLists` 使用了宽松的 URL 匹配原则。
+1. 当 `serverLists` 为空时,将解析到 `http://localhost:8500` 的 Consul Agent 实例。
+2. 当 `serverLists` 为 `hostname` 时,将解析到 `http://hostname:8500` 的 Consul Agent 
实例。
+3. 当 `serverLists` 为 `hostname:port` 时,将解析到 `http://hostname:port` 的 Consul 
Agent 实例。
+4. 当 `serverLists` 为 `http://hostName:port` 时,将解析到 `http://hostName:port` 的 
Consul Agent 实例。
+5. 当 `serverLists` 为 `https://hostName:port` 时,将解析到 `https://hostName:port` 的 
Consul Agent 实例。
+
 类型:Consul
 
 适用模式:Cluster
diff --git 
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
 
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
index d2ad3e30aba..8e7de1c8130 100644
--- 
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
+++ 
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
@@ -56,6 +56,16 @@ Attributes:
 
 ### Consul Repository
 
+Due to the limitation of the Maven module of 
`com.ecwid.consul:consul-api:1.4.5`, users cannot connect to the Consul Agent 
through the gRPC port.
+
+The `serverLists` property of the `Consul` implementation is by design and can 
only be connected to a single Consul Agent via an HTTP endpoint.
+`serverLists` uses relaxed URL matching principles.
+1. When `serverLists` is empty, the Consul Agent instance at 
`http://localhost:8500` will be resolved.
+2. When `serverLists` is `hostname`, it will be resolved to the Consul Agent 
instance of `http://hostname:8500`.
+3. When `serverLists` is `hostname:port`, it will be resolved to the Consul 
Agent instance of `http://hostname:port`.
+4. When `serverLists` is `http://hostName:port`, the Consul Agent instance of 
`http://hostName:port` will be resolved.
+5. When `serverLists` is `https://hostName:port`, the Consul Agent instance of 
`https://hostName:port` will be resolved.
+
 Type: Consul
 
 Mode: Cluster
diff --git 
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
 
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 3c0f363a123..d87f4d3903d 100644
--- 
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ 
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.mode.repository.cluster.consul;
 
+import com.ecwid.consul.transport.HttpResponse;
 import com.ecwid.consul.v1.ConsulClient;
 import com.ecwid.consul.v1.ConsulRawClient;
 import com.ecwid.consul.v1.QueryParams;
@@ -27,14 +28,17 @@ import com.ecwid.consul.v1.session.model.NewSession;
 import com.ecwid.consul.v1.session.model.Session;
 import com.google.common.base.Strings;
 import lombok.Getter;
+import org.apache.http.HttpStatus;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -63,9 +67,9 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
     
     @Override
     public void init(final ClusterPersistRepositoryConfiguration config) {
-        ConsulRawClient rawClient = 
Strings.isNullOrEmpty(config.getServerLists()) ? new ConsulRawClient() : new 
ConsulRawClient(config.getServerLists());
-        consulClient = new ShardingSphereConsulClient(rawClient);
         consulProps = new ConsulProperties(config.getProps());
+        ConsulRawClient rawClient = 
createConsulRawClient(config.getServerLists());
+        consulClient = new ShardingSphereConsulClient(rawClient);
         distributedLockHolder = new DistributedLockHolder(getType(), 
consulClient, consulProps);
         watchKeyMap = new HashMap<>(6, 1F);
     }
@@ -73,13 +77,21 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
     @Override
     public String getDirectly(final String key) {
         Response<GetValue> response = consulClient.getKVValue(key);
-        return null == response ? null : response.getValue().getValue();
+        if (null == response) {
+            return null;
+        }
+        GetValue value = response.getValue();
+        return null == value ? null : value.getValue();
     }
     
     @Override
     public List<String> getChildrenKeys(final String key) {
         Response<List<String>> response = consulClient.getKVKeysOnly(key);
-        return null == response ? Collections.emptyList() : 
response.getValue();
+        if (null == response) {
+            return Collections.emptyList();
+        }
+        List<String> value = response.getValue();
+        return null == value ? Collections.emptyList() : value;
     }
     
     @Override
@@ -102,9 +114,15 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
         consulClient.deleteKVValue(key);
     }
     
+    /**
+     * {@link ConsulRawClient} is a wrapper of blocking HTTP client and does 
not have a close method.
+     * Using such a Client does not necessarily conform to the implementation 
of the relevant SPI. ShardingSphere needs to
+     * consider solutions similar to <a 
href="https://github.com/spring-cloud/spring-cloud-consul/issues/475";>spring-cloud/spring-cloud-consul#475</a>.
+     *
+     * @see ConsulRawClient
+     */
     @Override
     public void close() {
-        // TODO
     }
     
     @Override
@@ -115,6 +133,24 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
         putParams.setAcquireSession(sessionId);
         consulClient.setKVValue(key, value, putParams);
         generatorFlushSessionTtlTask(consulClient, sessionId);
+        verifyConsulAgentRunning();
+    }
+    
+    @SuppressWarnings("HttpUrlsUsage")
+    private ConsulRawClient createConsulRawClient(final String serverLists) {
+        if (Strings.isNullOrEmpty(serverLists)) {
+            return new ConsulRawClient();
+        }
+        URL serverUrl;
+        try {
+            serverUrl = new URL(!serverLists.startsWith("https://";) && 
!serverLists.startsWith("http://";) ? "http://"; + serverLists : serverLists);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException(e);
+        }
+        if (-1 == serverUrl.getPort()) {
+            return new ConsulRawClient(serverUrl.getHost());
+        }
+        return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
     }
     
     private NewSession createNewSession(final String key) {
@@ -142,6 +178,10 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
         long currentIndex = 0;
         while (running.get()) {
             Response<List<GetValue>> response = consulClient.getKVValues(key, 
new 
QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS),
 currentIndex));
+            List<GetValue> value = response.getValue();
+            if (null == value) {
+                continue;
+            }
             Long index = response.getConsulIndex();
             if (null != index && 0 == currentIndex) {
                 currentIndex = index;
@@ -149,16 +189,16 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
                     watchKeyMap.put(key, new HashSet<>());
                 }
                 Collection<String> watchKeys = watchKeyMap.get(key);
-                for (GetValue each : response.getValue()) {
+                for (GetValue each : value) {
                     watchKeys.add(each.getKey());
                 }
                 continue;
             }
             if (null != index && index > currentIndex) {
                 currentIndex = index;
-                Collection<String> newKeys = new 
HashSet<>(response.getValue().size(), 1F);
+                Collection<String> newKeys = new HashSet<>(value.size(), 1F);
                 Collection<String> watchKeys = watchKeyMap.get(key);
-                for (GetValue each : response.getValue()) {
+                for (GetValue each : value) {
                     newKeys.add(each.getKey());
                     if (!watchKeys.contains(each.getKey())) {
                         watchKeys.add(each.getKey());
@@ -189,12 +229,24 @@ public final class ConsulRepository implements 
ClusterPersistRepository {
      * Flush session by update TTL.
      *
      * @param consulClient consul client
-     * @param sessionId session id
+     * @param sessionId    session id
      */
     public void generatorFlushSessionTtlTask(final ConsulClient consulClient, 
final String sessionId) {
         SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> 
consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, 
TimeUnit.SECONDS);
     }
     
+    /**
+     * See <a 
href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status";>Status 
HTTP API</a> .
+     *
+     * @throws RuntimeException Unable to connect to Consul Agent.
+     */
+    private void verifyConsulAgentRunning() {
+        HttpResponse httpResponse = 
consulClient.getRawClient().makeGetRequest("/v1/status/leader");
+        if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
+            throw new RuntimeException("Unable to connect to Consul Agent and 
StatusCode is " + httpResponse.getStatusCode() + ".");
+        }
+    }
+    
     @Override
     public String getType() {
         return "Consul";
diff --git 
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
 
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
index bf70e7c11ef..e4fa64e4487 100644
--- 
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++ 
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.shardingsphere.mode.repository.cluster.consul;
 
+import com.ecwid.consul.transport.HttpResponse;
+import com.ecwid.consul.v1.ConsulRawClient;
 import com.ecwid.consul.v1.QueryParams;
 import com.ecwid.consul.v1.Response;
 import com.ecwid.consul.v1.kv.model.GetValue;
 import com.ecwid.consul.v1.kv.model.PutParams;
 import com.ecwid.consul.v1.session.model.NewSession;
 import lombok.SneakyThrows;
+import org.apache.http.HttpStatus;
 import 
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import org.awaitility.Awaitility;
@@ -47,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
@@ -83,6 +87,12 @@ class ConsulRepositoryTest {
     @Mock
     private List<GetValue> getValueList;
     
+    @Mock
+    private ConsulRawClient consulRawClient;
+    
+    @Mock
+    private HttpResponse httpResponse;
+    
     private long index = 123456L;
     
     @BeforeEach
@@ -140,6 +150,9 @@ class ConsulRepositoryTest {
     
     @Test
     void assertPersistEphemeral() {
+        when(client.getRawClient()).thenReturn(consulRawClient);
+        
when(consulRawClient.makeGetRequest(any(String.class))).thenReturn(httpResponse);
+        when(httpResponse.getStatusCode()).thenReturn(HttpStatus.SC_OK);
         repository.persistEphemeral("key1", "value1");
         verify(client).sessionCreate(any(NewSession.class), 
any(QueryParams.class));
         verify(client).setKVValue(any(String.class), any(String.class), 
any(PutParams.class));
@@ -205,4 +218,20 @@ class ConsulRepositoryTest {
         repository.persist("key1", "value1");
         verify(client).setKVValue(any(String.class), any(String.class));
     }
+    
+    @Test
+    void assertNullResponse() {
+        when(response.getValue()).thenReturn(null);
+        final String key = "/key";
+        assertDoesNotThrow(() -> {
+            repository.getDirectly(key);
+            repository.getChildrenKeys(key);
+        });
+        when(responseGetValueList.getValue()).thenReturn(null);
+        assertDoesNotThrow(() -> {
+            repository.watch(key, event -> {
+            });
+            client.setKVValue(key, "value");
+        });
+    }
 }

Reply via email to