This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dcb37841283 Allow `ConsulRawClient` of
`cluster-mode-repository-consul` to be configured on ports other than `8500`
(#29621)
dcb37841283 is described below
commit dcb37841283b10d399997795b7177eccc5f8e750
Author: Ling Hengqian <[email protected]>
AuthorDate: Tue Jan 2 00:55:54 2024 +0800
Allow `ConsulRawClient` of `cluster-mode-repository-consul` to be
configured on ports other than `8500` (#29621)
---
.../builtin-algorithm/metadata-repository.cn.md | 10 +++
.../builtin-algorithm/metadata-repository.en.md | 10 +++
.../cluster/consul/ConsulRepository.java | 72 +++++++++++++++++++---
.../cluster/consul/ConsulRepositoryTest.java | 29 +++++++++
4 files changed, 111 insertions(+), 10 deletions(-)
diff --git
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
index 5181f984e9c..a2a290a9575 100644
---
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
+++
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
@@ -56,6 +56,16 @@ Apache ShardingSphere 为不同的运行模式提供了不同的元数据持久
### Consul 持久化
+受 `com.ecwid.consul:consul-api:1.4.5` 的 Maven 模块的限制,使用者无法通过 gRPC 端口来连接到
Consul Agent。
+
+`Consul` 实现的 `serverLists` 属性受设计使然,仅可通过 HTTP 端点连接到单个 Consul Agent。
+`serverLists` 使用了宽松的 URL 匹配原则。
+1. 当 `serverLists` 为空时,将解析到 `http://localhost:8500` 的 Consul Agent 实例。
+2. 当 `serverLists` 为 `hostname` 时,将解析到 `http://hostname:8500` 的 Consul Agent
实例。
+3. 当 `serverLists` 为 `hostname:port` 时,将解析到 `http://hostname:port` 的 Consul
Agent 实例。
+4. 当 `serverLists` 为 `http://hostName:port` 时,将解析到 `http://hostName:port` 的
Consul Agent 实例。
+5. 当 `serverLists` 为 `https://hostName:port` 时,将解析到 `https://hostName:port` 的
Consul Agent 实例。
+
类型:Consul
适用模式:Cluster
diff --git
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
index d2ad3e30aba..8e7de1c8130 100644
---
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
+++
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
@@ -56,6 +56,16 @@ Attributes:
### Consul Repository
+Due to the limitation of the Maven module of
`com.ecwid.consul:consul-api:1.4.5`, users cannot connect to the Consul Agent
through the gRPC port.
+
+The `serverLists` property of the `Consul` implementation is by design and can
only be connected to a single Consul Agent via an HTTP endpoint.
+`serverLists` uses relaxed URL matching principles.
+1. When `serverLists` is empty, the Consul Agent instance at
`http://localhost:8500` will be resolved.
+2. When `serverLists` is `hostname`, it will be resolved to the Consul Agent
instance of `http://hostname:8500`.
+3. When `serverLists` is `hostname:port`, it will be resolved to the Consul
Agent instance of `http://hostname:port`.
+4. When `serverLists` is `http://hostName:port`, the Consul Agent instance of
`http://hostName:port` will be resolved.
+5. When `serverLists` is `https://hostName:port`, the Consul Agent instance of
`https://hostName:port` will be resolved.
+
Type: Consul
Mode: Cluster
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 3c0f363a123..d87f4d3903d 100644
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.repository.cluster.consul;
+import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
@@ -27,14 +28,17 @@ import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import com.google.common.base.Strings;
import lombok.Getter;
+import org.apache.http.HttpStatus;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -63,9 +67,9 @@ public final class ConsulRepository implements
ClusterPersistRepository {
@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
- ConsulRawClient rawClient =
Strings.isNullOrEmpty(config.getServerLists()) ? new ConsulRawClient() : new
ConsulRawClient(config.getServerLists());
- consulClient = new ShardingSphereConsulClient(rawClient);
consulProps = new ConsulProperties(config.getProps());
+ ConsulRawClient rawClient =
createConsulRawClient(config.getServerLists());
+ consulClient = new ShardingSphereConsulClient(rawClient);
distributedLockHolder = new DistributedLockHolder(getType(),
consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
}
@@ -73,13 +77,21 @@ public final class ConsulRepository implements
ClusterPersistRepository {
@Override
public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
- return null == response ? null : response.getValue().getValue();
+ if (null == response) {
+ return null;
+ }
+ GetValue value = response.getValue();
+ return null == value ? null : value.getValue();
}
@Override
public List<String> getChildrenKeys(final String key) {
Response<List<String>> response = consulClient.getKVKeysOnly(key);
- return null == response ? Collections.emptyList() :
response.getValue();
+ if (null == response) {
+ return Collections.emptyList();
+ }
+ List<String> value = response.getValue();
+ return null == value ? Collections.emptyList() : value;
}
@Override
@@ -102,9 +114,15 @@ public final class ConsulRepository implements
ClusterPersistRepository {
consulClient.deleteKVValue(key);
}
+ /**
+ * {@link ConsulRawClient} is a wrapper of blocking HTTP client and does
not have a close method.
+ * Using such a Client does not necessarily conform to the implementation
of the relevant SPI. ShardingSphere needs to
+ * consider solutions similar to <a
href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
+ *
+ * @see ConsulRawClient
+ */
@Override
public void close() {
- // TODO
}
@Override
@@ -115,6 +133,24 @@ public final class ConsulRepository implements
ClusterPersistRepository {
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
generatorFlushSessionTtlTask(consulClient, sessionId);
+ verifyConsulAgentRunning();
+ }
+
+ @SuppressWarnings("HttpUrlsUsage")
+ private ConsulRawClient createConsulRawClient(final String serverLists) {
+ if (Strings.isNullOrEmpty(serverLists)) {
+ return new ConsulRawClient();
+ }
+ URL serverUrl;
+ try {
+ serverUrl = new URL(!serverLists.startsWith("https://") &&
!serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ if (-1 == serverUrl.getPort()) {
+ return new ConsulRawClient(serverUrl.getHost());
+ }
+ return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
}
private NewSession createNewSession(final String key) {
@@ -142,6 +178,10 @@ public final class ConsulRepository implements
ClusterPersistRepository {
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key,
new
QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS),
currentIndex));
+ List<GetValue> value = response.getValue();
+ if (null == value) {
+ continue;
+ }
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
@@ -149,16 +189,16 @@ public final class ConsulRepository implements
ClusterPersistRepository {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
- for (GetValue each : response.getValue()) {
+ for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
- Collection<String> newKeys = new
HashSet<>(response.getValue().size(), 1F);
+ Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
- for (GetValue each : response.getValue()) {
+ for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
@@ -189,12 +229,24 @@ public final class ConsulRepository implements
ClusterPersistRepository {
* Flush session by update TTL.
*
* @param consulClient consul client
- * @param sessionId session id
+ * @param sessionId session id
*/
public void generatorFlushSessionTtlTask(final ConsulClient consulClient,
final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() ->
consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L,
TimeUnit.SECONDS);
}
+ /**
+ * See <a
href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status">Status
HTTP API</a> .
+ *
+ * @throws RuntimeException Unable to connect to Consul Agent.
+ */
+ private void verifyConsulAgentRunning() {
+ HttpResponse httpResponse =
consulClient.getRawClient().makeGetRequest("/v1/status/leader");
+ if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
+ throw new RuntimeException("Unable to connect to Consul Agent and
StatusCode is " + httpResponse.getStatusCode() + ".");
+ }
+ }
+
@Override
public String getType() {
return "Consul";
diff --git
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
index bf70e7c11ef..e4fa64e4487 100644
---
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -17,12 +17,15 @@
package org.apache.shardingsphere.mode.repository.cluster.consul;
+import com.ecwid.consul.transport.HttpResponse;
+import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import lombok.SneakyThrows;
+import org.apache.http.HttpStatus;
import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.awaitility.Awaitility;
@@ -47,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
@@ -83,6 +87,12 @@ class ConsulRepositoryTest {
@Mock
private List<GetValue> getValueList;
+ @Mock
+ private ConsulRawClient consulRawClient;
+
+ @Mock
+ private HttpResponse httpResponse;
+
private long index = 123456L;
@BeforeEach
@@ -140,6 +150,9 @@ class ConsulRepositoryTest {
@Test
void assertPersistEphemeral() {
+ when(client.getRawClient()).thenReturn(consulRawClient);
+
when(consulRawClient.makeGetRequest(any(String.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusCode()).thenReturn(HttpStatus.SC_OK);
repository.persistEphemeral("key1", "value1");
verify(client).sessionCreate(any(NewSession.class),
any(QueryParams.class));
verify(client).setKVValue(any(String.class), any(String.class),
any(PutParams.class));
@@ -205,4 +218,20 @@ class ConsulRepositoryTest {
repository.persist("key1", "value1");
verify(client).setKVValue(any(String.class), any(String.class));
}
+
+ @Test
+ void assertNullResponse() {
+ when(response.getValue()).thenReturn(null);
+ final String key = "/key";
+ assertDoesNotThrow(() -> {
+ repository.getDirectly(key);
+ repository.getChildrenKeys(key);
+ });
+ when(responseGetValueList.getValue()).thenReturn(null);
+ assertDoesNotThrow(() -> {
+ repository.watch(key, event -> {
+ });
+ client.setKVValue(key, "value");
+ });
+ }
}