This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5dd56a5 Upgrade etcd cluster coordinator and dynamic configuration to
v3.x (#7215)
5dd56a5 is described below
commit 5dd56a5ca45c1bc423af71dc0829aed1221621ac
Author: Daming <[email protected]>
AuthorDate: Mon Jul 5 11:23:11 2021 +0800
Upgrade etcd cluster coordinator and dynamic configuration to v3.x (#7215)
---
CHANGES.md | 1 +
dist-material/release-docs/LICENSE | 4 +-
dist-material/release-docs/NOTICE | 14 ++
docs/en/setup/backend/backend-cluster.md | 10 +-
docs/en/setup/backend/configuration-vocabulary.md | 19 ++-
docs/en/setup/backend/dynamic-config.md | 10 +-
oap-server-bom/pom.xml | 91 ++++++-----
oap-server/pom.xml | 1 -
.../src/main/resources/application.yml | 16 +-
.../cluster-etcd-plugin/pom.xml | 100 ++++--------
.../plugin/etcd/ClusterModuleEtcdConfig.java | 43 +++--
.../plugin/etcd/ClusterModuleEtcdProvider.java | 19 +--
.../cluster/plugin/etcd/EtcdCoordinator.java | 179 +++++++++++++--------
.../oap/server/cluster/plugin/etcd/EtcdUtils.java | 50 ------
.../plugin/etcd/ITClusterEtcdPluginTest.java | 75 +++++----
.../ITClusterModuleEtcdProviderFunctionalTest.java | 47 +++---
.../configuration-etcd/pom.xml | 119 +++-----------
.../etcd/EtcdConfigWatcherRegister.java | 159 +++++-------------
.../etcd/EtcdConfigurationProvider.java | 25 +--
.../configuration/etcd/EtcdServerSettings.java | 47 +++---
.../oap/server/configuration/etcd/EtcdUtils.java | 70 --------
.../etcd/EtcdConfigurationTestProvider.java | 2 +-
.../etcd/ITEtcdConfigurationTest.java | 121 +++++++-------
.../server/configuration/etcd/TestEtcdUtils.java | 61 -------
.../src/test/resources/application.yml | 8 +-
oap-server/server-library/library-client/pom.xml | 12 ++
oap-server/server-library/library-server/pom.xml | 16 ++
.../known-oap-backend-dependencies-es7.txt | 20 +--
.../known-oap-backend-dependencies.txt | 20 +--
29 files changed, 556 insertions(+), 803 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 13e8235..b9b6abc 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -72,6 +72,7 @@ Release Notes.
* Support session expired threshold configurable.
* Fix InfluxDB storage-plugin Metrics#multiGet issue.
* Replace zuul proxy with spring cloud gateway 2.x. in webapp module.
+* Upgrade etcd cluster coordinator and dynamic configuration to v3.x.
#### UI
diff --git a/dist-material/release-docs/LICENSE
b/dist-material/release-docs/LICENSE
index 98a9dd8..ecb5a6f 100755
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -320,11 +320,9 @@ The text of each license is the standard Apache 2.0
license.
proto files from lyft/protoc-gen-validate:
https://github.com/lyft/protoc-gen-validate Apache 2.0
proto files from gogo/googleapis: https://github.com/gogo/googleapis
Apache 2.0
flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache
2.0
- json-flatter 0.6.0: https://github.com/wnameless/json-flattener Apache 2.0
Apache: commons-text 1.4: https://github.com/apache/commons-text Apache 2.0
sundrio 0.9.2: https://github.com/sundrio/sundrio Apache 2.0
Ctripcorp: apollo 1.8.0: https://github.com/ctripcorp/apollo Apache 2.0
- etcd4j 2.18.0: https://github.com/jurmous/etcd4j Apache 2.0
javaassist 3.25.0-GA: https://github.com/jboss-javassist/javassist Apache
2.0
jackson-module-afterburner 2.12.2:
https://github.com/FasterXML/jackson-modules-base, Apache 2.0
perfmark-api 0.19.0: https://github.com/perfmark/perfmark, Apache 2.0
@@ -343,6 +341,8 @@ The text of each license is the standard Apache 2.0 license.
okio 1.17.2: https://github.com/square/okio Apache 2.0
caffeine 2.6.2: https://github.com/ben-manes/caffeine Apache 2.0
simpleclient_httpserver from prometheus
https://github.com/prometheus/client_java Apache 2.0
+ jetcd 0.5.3, https://github.com/etcd-io/jetcd, Apache 2.0
+ failasfe 2.3.4, https://github.com/jhalterman/failsafe, Apache 2.0
========================================================================
MIT licenses
diff --git a/dist-material/release-docs/NOTICE
b/dist-material/release-docs/NOTICE
index 5b363ad..63036b2 100755
--- a/dist-material/release-docs/NOTICE
+++ b/dist-material/release-docs/NOTICE
@@ -896,3 +896,17 @@ This distribution has a binary dependency on jersey, which
is available under th
License. The source code of jersey can be found at
https://github.com/jersey/jersey/.
========================================================================
+
+------
+
+===========================================================================
+jetcd Notice
+===========================================================================
+
+CoreOS Project
+Copyright 2018 CoreOS, Inc
+
+This product includes software developed at CoreOS, Inc.
+(http://www.coreos.com/).
+
+===========================================================================
diff --git a/docs/en/setup/backend/backend-cluster.md
b/docs/en/setup/backend/backend-cluster.md
index 80247f0..dcd025a 100644
--- a/docs/en/setup/backend/backend-cluster.md
+++ b/docs/en/setup/backend/backend-cluster.md
@@ -89,12 +89,20 @@ The following settings are provided to set the host and
port manually, based on
## Etcd
-Set the **cluster/selector** to **etcd** in the yml to enable it.
+Set the **cluster/selector** to **etcd** in the yml to enable it. The Etcd
client has upgraded to v3 protocol and changed to the CoreOS official library.
**Since 8.7.0, only the v3 protocol is supported for Etcd.**
```yaml
cluster:
selector: ${SW_CLUSTER:etcd}
# other configurations
+ etcd:
+ # etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
+ endpoints: ${SW_CLUSTER_ETCD_ENDPOINTS:localhost:2379}
+ namespace: ${SW_CLUSTER_ETCD_NAMESPACE:/skywalking}
+ serviceName: ${SW_SCLUSTER_ETCD_ERVICE_NAME:"SkyWalking_OAP_Cluster"}
+ authentication: ${SW_CLUSTER_ETCD_AUTHENTICATION:false}
+ user: ${SW_SCLUSTER_ETCD_USER:}
+ password: ${SW_SCLUSTER_ETCD_PASSWORD:}
```
Same as the Zookeeper coordinator,
diff --git a/docs/en/setup/backend/configuration-vocabulary.md
b/docs/en/setup/backend/configuration-vocabulary.md
index 6969c80..843f95e 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -64,11 +64,12 @@ core|default|role|Option values,
`Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | aclToken| ALC Token of Consul. Empty string means `without ALC
token`.| SW_CLUSTER_CONSUL_ACLTOKEN | - |
| - | - | internalComHost| The hostname registered in the Consul for the
internal communication of OAP cluster.| - | -|
| - | - | internalComPort| The port registered in the Consul for the internal
communication of OAP cluster.| - | -1|
-| - | etcd| serviceName| Service name used for SkyWalking cluster.
|SW_SERVICE_NAME|SkyWalking_OAP_Cluster|
-| - | - | hostPort| hosts and ports used of etcd cluster.|
SW_CLUSTER_ETCD_HOST_PORT|localhost:2379|
-| - | - | isSSL| Open SSL for the connection between SkyWalking and etcd
cluster.| - | - |
-| - | - | internalComHost| The hostname registered in the etcd for the
internal communication of OAP cluster.| - | -|
-| - | - | internalComPort| The port registered in the etcd for the internal
communication of OAP cluster.| - | -1|
+| - | etcd| serviceName| Service name used for SkyWalking cluster.
|SW_CLUSTER_ETCD_SERVICE_NAME|SkyWalking_OAP_Cluster|
+| - | - | endpoints| hosts and ports used of etcd cluster.|
SW_CLUSTER_ETCD_ENDPOINTS|localhost:2379|
+| - | - | namespace | Namespace used for SkyWalking cluster.
|SW_CLUSTER_ETCD_NAMESPACE | /skywalking |
+| - | - | authentication | Whether has authentication. |
SW_CLUSTER_ETCD_AUTHENTICATION | false |
+| - | - | user | Etcd auth username | SW_CLUSTER_ETCD_USER | |
+| - | - | password | Etcd auth password | SW_CLUSTER_ETCD_PASSWORD | |
| - | Nacos| serviceName| Service name used for SkyWalking cluster.
|SW_SERVICE_NAME|SkyWalking_OAP_Cluster|
| - | - | hostPort| hosts and ports used of Nacos cluster.|
SW_CLUSTER_NACOS_HOST_PORT|localhost:8848|
| - | - | namespace| Namespace used by SkyWalking node coordination.|
SW_CLUSTER_NACOS_NAMESPACE|public|
@@ -264,9 +265,11 @@ core|default|role|Option values,
`Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | baseSleepTimeMs|The period of Zookeeper client between two retries.
Unit is ms.|SW_CONFIG_ZK_BASE_SLEEP_TIME_MS|1000|
| - | - | maxRetries| The max retry time of
re-trying.|SW_CONFIG_ZK_MAX_RETRIES|3|
| - | - | period | The period of data sync. Unit is second. |
SW_CONFIG_ZK_PERIOD | 60 |
-| - | etcd| clusterName| Service name used for SkyWalking cluster.
|SW_CONFIG_ETCD_CLUSTER_NAME|default|
-| - | - | serverAddr| hosts and ports used of etcd cluster.|
SW_CONFIG_ETCD_SERVER_ADDR|localhost:2379|
-| - | - | group |Additional prefix of the configuration key|
SW_CONFIG_ETCD_GROUP | skywalking|
+| - | etcd| endpoints | hosts and ports used of etcd cluster(If there are
multiple, separate them with commas). | SW_CONFIG_ETCD_ENDPOINTS |
localhost:2379 |
+| - | - | namespace | Namespace used for SkyWalking cluster.
|SW_CONFIG_ETCD_NAMESPACE | /skywalking |
+| - | - | authentication | Whether has authentication. |
SW_CONFIG_ETCD_AUTHENTICATION | false |
+| - | - | user | Etcd auth username | SW_CONFIG_ETCD_USER | |
+| - | - | password | Etcd auth password | SW_CONFIG_ETCD_PASSWORD | |
| - | - | period | The period of data sync. Unit is second. |
SW_CONFIG_ZK_PERIOD | 60
| - | consul | hostPort| hosts and ports used of Consul cluster.|
SW_CONFIG_CONSUL_HOST_AND_PORTS|localhost:8500|
| - | - | aclToken| ALC Token of Consul. Empty string means `without ALC
token`.| SW_CONFIG_CONSUL_ACL_TOKEN | - |
diff --git a/docs/en/setup/backend/dynamic-config.md
b/docs/en/setup/backend/dynamic-config.md
index 5f94200..44b7f86 100755
--- a/docs/en/setup/backend/dynamic-config.md
+++ b/docs/en/setup/backend/dynamic-config.md
@@ -70,11 +70,15 @@ configuration:
selector: ${SW_CONFIGURATION:etcd}
etcd:
period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default
fetch every 60 seconds.
- group: ${SW_CONFIG_ETCD_GROUP:skywalking}
- serverAddr: ${SW_CONFIG_ETCD_SERVER_ADDR:localhost:2379}
- clusterName: ${SW_CONFIG_ETCD_CLUSTER_NAME:default}
+ endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
+ namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
+ authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
+ user: ${SW_CONFIG_ETCD_USER:}
+ password: ${SW_CONFIG_ETCD_password:}
```
+**NOTICE**, only the v3 protocol is supported since 8.7.0.
+
## Dynamic Configuration Consul Implementation
[Consul](https://github.com/rickfast/consul-client) is also supported as
DCC(Dynamic Configuration Center), to use it, please configure as follows:
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index 69812c0..6fad449 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,11 +73,20 @@
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<flatbuffers-java.version>1.12.0</flatbuffers-java.version>
<postgresql.version>42.2.18</postgresql.version>
+ <jetcd.version>0.5.3</jetcd.version>
+ <testcontainers.version>1.15.3</testcontainers.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-bom</artifactId>
+ <version>${netty.version}</version>
+ <scope>import</scope>
+ <type>pom</type>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
@@ -200,6 +209,16 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
@@ -253,6 +272,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
@@ -329,65 +353,37 @@
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
+
<dependency>
- <groupId>org.mousio</groupId>
- <artifactId>etcd4j</artifactId>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <version>${jetcd.version}</version>
<exclusions>
<exclusion>
- <artifactId>netty-codec-dns</artifactId>
- <groupId>io.netty</groupId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
</exclusion>
-
<exclusion>
- <artifactId>netty-codec-dns</artifactId>
- <groupId>io.netty</groupId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
</exclusion>
-
<exclusion>
- <artifactId>netty-codec-http</artifactId>
- <groupId>io.netty</groupId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
</exclusion>
-
<exclusion>
- <artifactId>netty-handler</artifactId>
- <groupId>io.netty</groupId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
</exclusion>
-
<exclusion>
- <artifactId>netty-resolver-dns</artifactId>
- <groupId>io.netty</groupId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
</exclusion>
-
<exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-afterburner</artifactId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
- <version>${etcd4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-resolver-dns</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-dns</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
- <version>${netty.version}</version>
</dependency>
<dependency>
@@ -496,6 +492,13 @@
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 00d2f75..6fa7990 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -54,7 +54,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-docker-plugin.version>0.30.0</maven-docker-plugin.version>
- <etcd.version>v3.2.3</etcd.version>
<zookeeper.image.version>3.5</zookeeper.image.version>
<kafka-clients.version>2.4.1</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml
b/oap-server/server-bootstrap/src/main/resources/application.yml
index d27121b..7866d4c 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -38,9 +38,13 @@ cluster:
hostPort: ${SW_CLUSTER_CONSUL_HOST_PORT:localhost:8500}
aclToken: ${SW_CLUSTER_CONSUL_ACLTOKEN:""}
etcd:
- serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
- hostPort: ${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
+ endpoints: ${SW_CLUSTER_ETCD_ENDPOINTS:localhost:2379}
+ namespace: ${SW_CLUSTER_ETCD_NAMESPACE:/skywalking}
+ serviceName: ${SW_SCLUSTER_ETCD_ERVICE_NAME:"SkyWalking_OAP_Cluster"}
+ authentication: ${SW_CLUSTER_ETCD_AUTHENTICATION:false}
+ user: ${SW_SCLUSTER_ETCD_USER:}
+ password: ${SW_SCLUSTER_ETCD_PASSWORD:}
nacos:
serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
hostPort: ${SW_CLUSTER_NACOS_HOST_PORT:localhost:8848}
@@ -448,9 +452,11 @@ configuration:
maxRetries: ${SW_CONFIG_ZK_MAX_RETRIES:3} # max number of times to retry
etcd:
period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default
fetch every 60 seconds.
- group: ${SW_CONFIG_ETCD_GROUP:skywalking}
- serverAddr: ${SW_CONFIG_ETCD_SERVER_ADDR:localhost:2379}
- clusterName: ${SW_CONFIG_ETCD_CLUSTER_NAME:default}
+ endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
+ namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
+ authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
+ user: ${SW_CONFIG_ETCD_USER:}
+ password: ${SW_CONFIG_ETCD_password:}
consul:
# Consul host and ports, separated by comma, e.g. 1.2.3.4:8500,2.3.4.5:8500
hostAndPorts: ${SW_CONFIG_CONSUL_HOST_AND_PORTS:1.2.3.4:8500}
diff --git a/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml
index 47d46c7..311ff9a 100644
--- a/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-etcd-plugin/pom.xml
@@ -37,35 +37,54 @@
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-dns</artifactId>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
+ <artifactId>netty-codec-http2</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-resolver-dns</artifactId>
+ <artifactId>netty-handler-proxy</artifactId>
</dependency>
<dependency>
- <groupId>org.mousio</groupId>
- <artifactId>etcd4j</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-afterburner</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
</dependencies>
<profiles>
@@ -74,69 +93,8 @@
<build>
<plugins>
<plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <configuration>
- <sourceMode>all</sourceMode>
- <logDate>default</logDate>
- <verbose>true</verbose>
- <imagePullPolicy>IfNotPresent</imagePullPolicy>
- </configuration>
- <executions>
- <execution>
- <id>start</id>
- <phase>pre-integration-test</phase>
- <goals>
- <goal>start</goal>
- </goals>
- <configuration>
- <images>
- <image>
-
<name>quayio/coreos-etcd:${etcd.version}</name>
-
<alias>etcd-client-integration-test</alias>
- <run>
- <ports>
-
<port>+etcd.host:etcd.port:2379</port>
- </ports>
- <wait>
- <time>20000</time>
- </wait>
- <entrypoint>
- <!-- exec form -->
- <exec>
-
<arg>/usr/local/bin/etcd</arg>
-
<arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
-
<arg>--listen-client-urls=http://0.0.0.0:2379</arg>
- </exec>
- </entrypoint>
- </run>
- </image>
- </images>
- </configuration>
- </execution>
- <execution>
- <id>remove-it-etcd</id>
- <phase>post-integration-test</phase>
- <goals>
- <goal>stop</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
- <configuration>
- <systemPropertyVariables>
- <etcd.host>
- ${etcd.host}
- </etcd.host>
- <etcd.port>
- ${etcd.port}
- </etcd.port>
- </systemPropertyVariables>
- </configuration>
<executions>
<execution>
<goals>
diff --git
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java
index bba3727..ff98a7c 100644
---
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java
+++
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdConfig.java
@@ -18,25 +18,38 @@
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
-import lombok.Getter;
-import lombok.Setter;
+import com.google.common.base.Strings;
+import java.util.Arrays;
+import lombok.Data;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+@Data
public class ClusterModuleEtcdConfig extends ModuleConfig {
-
- @Setter
- @Getter
private String serviceName;
- @Setter
- @Getter
- private String hostPort;
- @Setter
- @Getter
- private boolean isSSL;
- @Setter
- @Getter
+ private String endpoints;
+ private String namespace;
+
+ private String authority;
+
+ private boolean authentication;
+ private String user;
+ private String password;
+
private String internalComHost;
- @Setter
- @Getter
private int internalComPort = -1;
+
+ public String getNamespace() {
+ if (Strings.isNullOrEmpty(namespace)) {
+ return null;
+ }
+ if (!namespace.endsWith("/")) {
+ return namespace + "/";
+ }
+ return namespace;
+ }
+
+ public String[] getEndpointArray() {
+ return
Arrays.stream(endpoints.split("\\s*,\\s*")).toArray(String[]::new);
+
+ }
}
diff --git
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java
index 553471d..0ff9b77 100644
---
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java
+++
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ClusterModuleEtcdProvider.java
@@ -18,9 +18,6 @@
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
-import java.net.URI;
-import java.util.List;
-import mousio.etcd4j.EtcdClient;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
@@ -38,10 +35,7 @@ public class ClusterModuleEtcdProvider extends
ModuleProvider {
private final ClusterModuleEtcdConfig config;
- private EtcdClient client;
-
public ClusterModuleEtcdProvider() {
- super();
this.config = new ClusterModuleEtcdConfig();
}
@@ -62,12 +56,13 @@ public class ClusterModuleEtcdProvider extends
ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException,
ModuleStartException {
- List<URI> uris = EtcdUtils.parse(config);
- //TODO check isSSL
- client = new EtcdClient(uris.toArray(new URI[] {}));
- EtcdCoordinator coordinator = new EtcdCoordinator(getManager(),
config, client);
- this.registerServiceImplementation(ClusterRegister.class, coordinator);
- this.registerServiceImplementation(ClusterNodesQuery.class,
coordinator);
+ try {
+ EtcdCoordinator coordinator = new EtcdCoordinator(getManager(),
config);
+ this.registerServiceImplementation(ClusterRegister.class,
coordinator);
+ this.registerServiceImplementation(ClusterNodesQuery.class,
coordinator);
+ } catch (Exception e) {
+ throw new ModuleStartException("Failed to start ETCD
coordinator.", e);
+ }
}
@Override
diff --git
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
index 8d20d47..aceb581 100644
---
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
+++
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
@@ -20,15 +20,21 @@ package
org.apache.skywalking.oap.server.cluster.plugin.etcd;
import com.google.common.base.Strings;
import com.google.gson.Gson;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.grpc.stub.StreamObserver;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import mousio.etcd4j.EtcdClient;
-import mousio.etcd4j.promises.EtcdResponsePromise;
-import mousio.etcd4j.responses.EtcdKeysResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
@@ -37,30 +43,48 @@ import
org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
- private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdCoordinator.class);
- private static final Integer KEY_TTL = 45;
-
+ private static final Gson GSON = new Gson().newBuilder().create();
private final ModuleDefineHolder manager;
private final ClusterModuleEtcdConfig config;
- private final EtcdClient client;
- private final String serviceName;
- private final ScheduledExecutorService service =
Executors.newSingleThreadScheduledExecutor();
private volatile Address selfAddress;
private HealthCheckMetrics healthChecker;
- public EtcdCoordinator(final ModuleDefineHolder manager, final
ClusterModuleEtcdConfig config, final EtcdClient client) {
+ private final Client client;
+ private final String serviceName;
+ private final ByteSequence serviceNameBS;
+
+ public EtcdCoordinator(final ModuleDefineHolder manager,
+ final ClusterModuleEtcdConfig config) throws
ModuleStartException {
+ if (Strings.isNullOrEmpty(config.getServiceName())) {
+ throw new ModuleStartException("ServiceName cannot be empty.");
+ }
this.manager = manager;
this.config = config;
- this.client = client;
- this.serviceName = config.getServiceName();
+ if (!config.getServiceName().endsWith("/")) {
+ serviceName = config.getServiceName() + "/";
+ } else {
+ serviceName = config.getServiceName();
+ }
+ this.serviceNameBS = ByteSequence.from(serviceName,
Charset.defaultCharset());
+ ClientBuilder builder = Client.builder()
+ .endpoints(config.getEndpointArray())
+ .authority(config.getAuthority());
+ if (StringUtil.isNotEmpty(config.getNamespace())) {
+ builder.namespace(ByteSequence.from(config.getNamespace(),
Charset.defaultCharset()));
+ }
+ if (config.isAuthentication()) {
+ builder.user(ByteSequence.from(config.getUser(),
Charset.defaultCharset()))
+ .password(ByteSequence.from(config.getPassword(),
Charset.defaultCharset()));
+ }
+ this.client = builder.build();
}
@Override
@@ -68,20 +92,25 @@ public class EtcdCoordinator implements ClusterRegister,
ClusterNodesQuery {
List<RemoteInstance> remoteInstances = new ArrayList<>();
try {
initHealthChecker();
- EtcdKeysResponse response = client.get(serviceName +
"/").send().get();
- List<EtcdKeysResponse.EtcdNode> nodes =
response.getNode().getNodes();
-
- Gson gson = new Gson();
- if (nodes != null) {
- nodes.forEach(node -> {
- EtcdEndpoint endpoint = gson.fromJson(node.getValue(),
EtcdEndpoint.class);
- Address address = new Address(endpoint.getHost(),
endpoint.getPort(), true);
- if (!address.equals(selfAddress)) {
- address.setSelf(false);
- }
- remoteInstances.add(new RemoteInstance(address));
- });
- }
+
+ final KV kvClient = client.getKVClient();
+ final GetResponse response = kvClient.get(
+ serviceNameBS,
+ GetOption.newBuilder().withPrefix(serviceNameBS).build()
+ ).get();
+
+ response.getKvs().forEach(kv -> {
+ EtcdEndpoint endpoint = GSON.fromJson(
+ kv.getValue().toString(Charset.defaultCharset()),
+ EtcdEndpoint.class
+ );
+ Address address = new Address(endpoint.getHost(),
endpoint.getPort(), false);
+ if (address.equals(selfAddress)) {
+ address.setSelf(true);
+ }
+ remoteInstances.add(new RemoteInstance(address));
+ });
+
ClusterHealthStatus healthStatus =
OAPNodeChecker.isHealth(remoteInstances);
if (healthStatus.isHealth()) {
this.healthChecker.health();
@@ -97,54 +126,63 @@ public class EtcdCoordinator implements ClusterRegister,
ClusterNodesQuery {
@Override
public void registerRemote(RemoteInstance remoteInstance) throws
ServiceRegisterException {
-
if (needUsingInternalAddr()) {
- remoteInstance = new RemoteInstance(new
Address(config.getInternalComHost(), config.getInternalComPort(), true));
+ remoteInstance = new RemoteInstance(
+ new Address(config.getInternalComHost(),
config.getInternalComPort(), true));
}
this.selfAddress = remoteInstance.getAddress();
-
- EtcdEndpoint endpoint = new
EtcdEndpoint.Builder().serviceName(serviceName)
-
.host(selfAddress.getHost())
-
.port(selfAddress.getPort())
- .build();
+ final EtcdEndpoint endpoint = new
EtcdEndpoint.Builder().serviceName(serviceName)
+
.host(selfAddress.getHost())
+
.port(selfAddress.getPort())
+ .build();
try {
initHealthChecker();
- client.putDir(serviceName).send();
- String key = buildKey(serviceName, selfAddress, remoteInstance);
- String json = new Gson().toJson(endpoint);
- EtcdResponsePromise<EtcdKeysResponse> promise = client.put(key,
json).ttl(KEY_TTL).send();
- //check register.
- promise.get();
- renew(client, key, json);
+
+ final Lease leaseClient = client.getLeaseClient();
+ final long leaseID = leaseClient.grant(30L).get().getID();
+
+ ByteSequence instance = ByteSequence.from(GSON.toJson(endpoint),
Charset.defaultCharset());
+ client.getKVClient()
+ .put(
+ buildKey(serviceName, selfAddress, remoteInstance),
+ instance,
+ PutOption.newBuilder().withLeaseId(leaseID).build()
+ )
+ .get();
healthChecker.health();
+
+ client.getLeaseClient().keepAlive(leaseID, new
StreamObserver<LeaseKeepAliveResponse>() {
+ @Override
+ public void onNext(final LeaseKeepAliveResponse response) {
+ if (log.isDebugEnabled()) {
+ log.debug("Refresh lease id = {}, ttl = {}",
response.getID(), response.getTTL());
+ }
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ log.error("Failed to keep alive in Etcd coordinator",
throwable);
+ healthChecker.unHealth(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceRegisterException(e.getMessage());
}
-
- }
-
- private void renew(EtcdClient client, String key, String json) {
- service.scheduleAtFixedRate(() -> {
- try {
- client.refresh(key, KEY_TTL).send().get();
- } catch (Exception e) {
- try {
- client.put(key, json).ttl(KEY_TTL).send().get();
- } catch (Exception ee) {
- LOGGER.error(ee.getMessage(), ee);
- }
- }
- }, 5 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
}
- private String buildKey(String serviceName, Address address,
RemoteInstance instance) {
- return new StringBuilder(serviceName).append("/")
- .append(address.getHost())
- .append("_")
- .append(instance.hashCode())
- .toString();
+ private static ByteSequence buildKey(String serviceName, Address address,
RemoteInstance instance) {
+ String key = new StringBuilder(serviceName).append(address.getHost())
+ .append("_")
+ .append(instance.hashCode())
+ .toString();
+ return ByteSequence.from(key, Charset.defaultCharset());
}
private boolean needUsingInternalAddr() {
@@ -153,8 +191,11 @@ public class EtcdCoordinator implements ClusterRegister,
ClusterNodesQuery {
private void initHealthChecker() {
if (healthChecker == null) {
- MetricsCreator metricCreator =
manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
- healthChecker =
metricCreator.createHealthCheckerGauge("cluster_etcd", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE);
+ MetricsCreator metricCreator = manager.find(TelemetryModule.NAME)
+ .provider()
+
.getService(MetricsCreator.class);
+ healthChecker = metricCreator.createHealthCheckerGauge(
+ "cluster_etcd", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
}
diff --git
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdUtils.java
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdUtils.java
deleted file mode 100644
index ad848e8..0000000
---
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.etcd;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.util.Address;
-import
org.apache.skywalking.oap.server.library.util.ConnectStringParseException;
-import org.apache.skywalking.oap.server.library.util.ConnectUtils;
-
-public class EtcdUtils {
-
- public EtcdUtils() {
- }
-
- public static List<URI> parse(ClusterModuleEtcdConfig config) throws
ModuleStartException {
- List<URI> uris = new ArrayList<>();
- try {
- List<Address> addressList =
ConnectUtils.parse(config.getHostPort());
- for (Address address : addressList) {
- uris.add(URI.create(new
StringBuilder("http://").append(address.getHost())
- .append(":")
-
.append(address.getPort())
- .toString()));
- }
- } catch (ConnectStringParseException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
-
- return uris;
- }
-}
diff --git
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java
index 0490b62..6f1e4e6 100644
---
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java
+++
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterEtcdPluginTest.java
@@ -18,63 +18,68 @@
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
-import java.net.URI;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.options.GetOption;
+import java.nio.charset.Charset;
+import java.util.Collections;
import java.util.List;
-import mousio.etcd4j.EtcdClient;
-import mousio.etcd4j.responses.EtcdKeysResponse;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
-import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+@Slf4j
public class ITClusterEtcdPluginTest {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(ITClusterEtcdPluginTest.class);
-
private ClusterModuleEtcdConfig etcdConfig;
- private EtcdClient client;
+ private Client client;
private HealthCheckMetrics healthChecker = mock(HealthCheckMetrics.class);
private EtcdCoordinator coordinator;
- private Address remoteAddress = new Address("10.0.0.1", 1000, false);
- private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
-
- private Address internalAddress = new Address("10.0.0.3", 1002, false);
+ private final Address remoteAddress = new Address("10.0.0.1", 1000, false);
+ private final Address selfRemoteAddress = new Address("10.0.0.2", 1001,
true);
+ private final Address internalAddress = new Address("10.0.0.3", 1002,
false);
private static final String SERVICE_NAME = "my-service";
+ @ClassRule
+ public static final GenericContainer CONTAINER =
+ new GenericContainer(DockerImageName.parse("bitnami/etcd:3.5.0"))
+ .waitingFor(Wait.forLogMessage(".*etcd setup finished!.*", 1))
+ .withEnv(Collections.singletonMap("ALLOW_NONE_AUTHENTICATION",
"yes"));
+
@Before
public void before() throws Exception {
- String etcdHost = System.getProperty("etcd.host");
- String port = System.getProperty("etcd.port");
- String baseUrl = "http://" + etcdHost + ":" + port;
- LOGGER.info("etcd baseURL: {}", baseUrl);
+ String baseUrl = "http://127.0.0.1:" + CONTAINER.getMappedPort(2379);
etcdConfig = new ClusterModuleEtcdConfig();
+ etcdConfig.setEndpoints(baseUrl);
+ etcdConfig.setNamespace("skywalking/");
+
etcdConfig.setServiceName(SERVICE_NAME);
- client = new EtcdClient(URI.create(baseUrl));
doNothing().when(healthChecker).health();
+
ModuleDefineHolder manager = mock(ModuleDefineHolder.class);
- coordinator = new EtcdCoordinator(manager, etcdConfig, client);
- Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
- }
+ coordinator = new EtcdCoordinator(manager, etcdConfig);
- @After
- public void after() throws Exception {
- client.close();
+ client = Whitebox.getInternalState(coordinator, "client");
+ Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
}
@Test
@@ -128,12 +133,20 @@ public class ITClusterEtcdPluginTest {
}
private void clear() throws Throwable {
- EtcdKeysResponse response = client.get(SERVICE_NAME +
"/").send().get();
- List<EtcdKeysResponse.EtcdNode> nodes = response.getNode().getNodes();
-
- for (EtcdKeysResponse.EtcdNode node : nodes) {
- client.delete(node.getKey()).send().get();
- }
+ ByteSequence prefix = ByteSequence.from(SERVICE_NAME + "/",
Charset.defaultCharset());
+ GetResponse response = client.getKVClient()
+ .get(
+ ByteSequence.EMPTY,
+
GetOption.newBuilder().withPrefix(prefix).build()
+ ).get();
+
+ response.getKvs().forEach(e -> {
+ try {
+ client.getKVClient().delete(e.getKey()).get();
+ } catch (Exception exp) {
+ log.error("", exp);
+ }
+ });
}
private void verifyRegistration(Address remoteAddress, EtcdEndpoint
endpoint) {
diff --git
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterModuleEtcdProviderFunctionalTest.java
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterModuleEtcdProviderFunctionalTest.java
index c8e95c5..adc841b 100644
---
a/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterModuleEtcdProviderFunctionalTest.java
+++
b/oap-server/server-cluster-plugin/cluster-etcd-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/ITClusterModuleEtcdProviderFunctionalTest.java
@@ -18,9 +18,9 @@
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
+import io.etcd.jetcd.Client;
import java.util.Collections;
import java.util.List;
-import mousio.etcd4j.EtcdClient;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
@@ -34,9 +34,13 @@ import
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -45,21 +49,20 @@ import static org.mockito.Mockito.mock;
public class ITClusterModuleEtcdProviderFunctionalTest {
- private String etcdAddress;
- private ModuleManager moduleManager = mock(ModuleManager.class);
- private NoneTelemetryProvider telemetryProvider =
mock(NoneTelemetryProvider.class);
+ private static String ENDPOINTS;
+ private static NoneTelemetryProvider TELEMETRY_PROVIDER =
mock(NoneTelemetryProvider.class);
+
+ @ClassRule
+ public static final GenericContainer CONTAINER =
+ new GenericContainer(DockerImageName.parse("bitnami/etcd:3.5.0"))
+ .waitingFor(Wait.forLogMessage(".*etcd setup finished!.*", 1))
+ .withEnv(Collections.singletonMap("ALLOW_NONE_AUTHENTICATION",
"yes"));
@Before
- public void before() {
- Mockito.when(telemetryProvider.getService(MetricsCreator.class))
- .thenReturn(new MetricsCreatorNoop());
- TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
- Whitebox.setInternalState(telemetryModule, "loadedProvider",
telemetryProvider);
-
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
- String etcdHost = System.getProperty("etcd.host");
- String port = System.getProperty("etcd.port");
- assertTrue(!StringUtil.isEmpty(etcdHost) && !StringUtil.isEmpty(port));
- etcdAddress = etcdHost + ":" + port;
+ public void setup() {
+ Mockito.when(TELEMETRY_PROVIDER.getService(MetricsCreator.class))
+ .thenReturn(new MetricsCreatorNoop());
+ ENDPOINTS = "http://127.0.0.1:" + CONTAINER.getMappedPort(2379);
}
@Test
@@ -160,7 +163,7 @@ public class ITClusterModuleEtcdProviderFunctionalTest {
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
// unregister A
- EtcdClient client = Whitebox.getInternalState(providerA, "client");
+ Client client =
Whitebox.getInternalState(getClusterRegister(providerA), "client");
client.close();
// only B
@@ -176,12 +179,12 @@ public class ITClusterModuleEtcdProviderFunctionalTest {
}
private ClusterModuleEtcdProvider createProvider(String serviceName,
String internalComHost,
- int internalComPort) throws ModuleStartException {
+ int internalComPort)
throws ModuleStartException {
ClusterModuleEtcdProvider provider = new ClusterModuleEtcdProvider();
ClusterModuleEtcdConfig config = (ClusterModuleEtcdConfig)
provider.createConfigBeanIfAbsent();
- config.setHostPort(etcdAddress);
+ config.setEndpoints(ENDPOINTS);
config.setServiceName(serviceName);
if (!StringUtil.isEmpty(internalComHost)) {
@@ -191,7 +194,12 @@ public class ITClusterModuleEtcdProviderFunctionalTest {
if (internalComPort > 0) {
config.setInternalComPort(internalComPort);
}
- provider.setManager(moduleManager);
+ TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
+ Whitebox.setInternalState(telemetryModule, "loadedProvider",
TELEMETRY_PROVIDER);
+ ModuleManager manager = mock(ModuleManager.class);
+
Mockito.when(manager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
+
+ provider.setManager(manager);
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
@@ -211,7 +219,7 @@ public class ITClusterModuleEtcdProviderFunctionalTest {
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int
goals,
- int cyclic) throws InterruptedException {
+ int cyclic) throws
InterruptedException {
do {
List<RemoteInstance> instances =
getClusterNodesQuery(provider).queryRemoteNodes();
if (instances.size() == goals) {
@@ -241,4 +249,5 @@ public class ITClusterModuleEtcdProviderFunctionalTest {
assertTrue(selfExist);
assertTrue(otherExist);
}
+
}
diff --git a/oap-server/server-configuration/configuration-etcd/pom.xml
b/oap-server/server-configuration/configuration-etcd/pom.xml
index 7a42a43..fedf770 100644
--- a/oap-server/server-configuration/configuration-etcd/pom.xml
+++ b/oap-server/server-configuration/configuration-etcd/pom.xml
@@ -34,39 +34,54 @@
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-dns</artifactId>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
+ <artifactId>netty-codec-http2</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-resolver-dns</artifactId>
+ <artifactId>netty-handler-proxy</artifactId>
</dependency>
<dependency>
- <groupId>org.mousio</groupId>
- <artifactId>etcd4j</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-afterburner</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -88,92 +103,8 @@
<build>
<plugins>
<plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <configuration>
- <sourceMode>all</sourceMode>
- <logDate>default</logDate>
- <verbose>true</verbose>
- <imagePullPolicy>IfNotPresent</imagePullPolicy>
- </configuration>
- <executions>
- <execution>
- <id>start</id>
- <phase>pre-integration-test</phase>
- <goals>
- <goal>start</goal>
- </goals>
- <configuration>
- <images>
- <image>
-
<name>quayio/coreos-etcd:${etcd.version}</name>
-
<alias>etcd-client-integration-test</alias>
- <run>
- <ports>
- <port>etcd.port:2379</port>
- </ports>
- <wait>
- <time>5000</time>
- </wait>
- <entrypoint>
- <!-- exec form -->
- <exec>
-
<arg>/usr/local/bin/etcd</arg>
-
<arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
-
<arg>--listen-client-urls=http://0.0.0.0:2379</arg>
- </exec>
- </entrypoint>
- </run>
- </image>
- </images>
- </configuration>
- </execution>
- <execution>
- <id>remove-it-etcd</id>
- <phase>post-integration-test</phase>
- <goals>
- <goal>stop</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.gmaven</groupId>
- <artifactId>gmaven-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>add-default-properties</id>
- <phase>initialize</phase>
- <goals>
- <goal>execute</goal>
- </goals>
- <configuration>
- <providerSelection>2.0</providerSelection>
- <source>
-
project.properties.setProperty('etcd.host', 'localhost')
-
- log.info("Etcd host is " +
project.properties['etcd.host'])
- </source>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
- <configuration>
- <systemPropertyVariables>
- <etcd.host>
- ${etcd.host}
- </etcd.host>
- <etcd.port>
- ${etcd.port}
- </etcd.port>
- </systemPropertyVariables>
- </configuration>
<executions>
<execution>
<goals>
diff --git
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
index 6123295..44df608 100644
---
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
+++
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
@@ -13,144 +13,65 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.skywalking.oap.server.configuration.etcd;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Map;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.kv.GetResponse;
+import java.nio.charset.Charset;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import mousio.client.promises.ResponsePromise;
-import mousio.etcd4j.EtcdClient;
-import mousio.etcd4j.promises.EtcdResponsePromise;
-import mousio.etcd4j.responses.EtcdErrorCode;
-import mousio.etcd4j.responses.EtcdException;
-import mousio.etcd4j.responses.EtcdKeysResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import
org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
- private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdConfigWatcherRegister.class);
-
- /**
- * server settings for Etcd configuration
- */
- private EtcdServerSettings settings;
-
- /**
- * etcd client.
- */
- private final EtcdClient client;
-
- private final Map<String, ResponsePromise.IsSimplePromiseResponseHandler>
listenersByKey;
-
- private final Map<String, Optional<String>> configItemKeyedByName;
-
- private final Map<String, EtcdResponsePromise<EtcdKeysResponse>>
responsePromiseByKey;
-
- public EtcdConfigWatcherRegister(EtcdServerSettings settings) {
- super(settings.getPeriod());
- this.settings = settings;
- this.configItemKeyedByName = new ConcurrentHashMap<>();
- this.client = new EtcdClient(EtcdUtils.parse(settings).toArray(new
URI[] {}));
- this.listenersByKey = new ConcurrentHashMap<>();
- responsePromiseByKey = new ConcurrentHashMap<>();
- }
-
- @Override
- public Optional<ConfigTable> readConfig(Set<String> keys) {
- removeUninterestedKeys(keys);
- registerKeyListeners(keys);
- final ConfigTable table = new ConfigTable();
+ private final KV client;
- for (Map.Entry<String, Optional<String>> entry :
configItemKeyedByName.entrySet()) {
- final String key = entry.getKey();
- final Optional<String> value = entry.getValue();
+ public EtcdConfigWatcherRegister(EtcdServerSettings setting) {
+ super(setting.getPeriod());
+ ClientBuilder builder = Client.builder()
+ .authority(setting.getAuthority())
+ .endpoints(setting.getEndpointArray());
- if (value.isPresent()) {
- table.add(new ConfigTable.ConfigItem(key, value.get()));
- } else {
- table.add(new ConfigTable.ConfigItem(key, null));
- }
+ if (StringUtil.isNotEmpty(setting.getNamespace())) {
+ builder.namespace(ByteSequence.from(setting.getNamespace(),
Charset.defaultCharset()));
}
-
- return Optional.of(table);
- }
-
- private void registerKeyListeners(final Set<String> keys) {
- for (final String key : keys) {
- String dataId = "/" + settings.getGroup() + "/" + key;
- if (listenersByKey.containsKey(dataId)) {
- continue;
- }
-
- listenersByKey.putIfAbsent(dataId, p -> {
- onDataValueChanged(p, dataId);
- });
-
- try {
- EtcdResponsePromise<EtcdKeysResponse> responsePromise =
client.get(dataId).waitForChange().send();
- responsePromise.addListener(listenersByKey.get(dataId));
- responsePromiseByKey.putIfAbsent(dataId, responsePromise);
-
- // the key is newly added, read the config for the first time
- EtcdResponsePromise<EtcdKeysResponse> promise =
client.get(dataId).send();
- onDataValueChanged(promise, dataId);
- } catch (Exception e) {
- throw new EtcdConfigException("wait for etcd value change
fail", e);
- }
+ if (setting.isAuthentication()) {
+ builder.user(ByteSequence.from(setting.getUser(),
Charset.defaultCharset()))
+ .password(ByteSequence.from(setting.getPassword(),
Charset.defaultCharset()));
}
+ client = builder.build().getKVClient();
}
- private void removeUninterestedKeys(final Set<String> interestedKeys) {
- final Set<String> uninterestedKeys = new
HashSet<>(listenersByKey.keySet());
- uninterestedKeys.removeAll(interestedKeys);
-
- uninterestedKeys.forEach(k -> {
- final ResponsePromise.IsSimplePromiseResponseHandler listener =
listenersByKey.remove(k);
- if (listener != null) {
- responsePromiseByKey.remove(k).removeListener(listener);
- }
- });
- }
-
- private void onDataValueChanged(ResponsePromise<EtcdKeysResponse> promise,
String dataId) {
- String key = getRealKey(dataId, settings.getGroup());
- try {
- EtcdKeysResponse.EtcdNode node = promise.get().getNode();
- String value = node.getValue();
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Etcd config changed: {}: {}", key,
node.getValue());
- }
-
- configItemKeyedByName.put(key, Optional.ofNullable(value));
- } catch (Exception e) {
- if (e instanceof EtcdException) {
- if (EtcdErrorCode.KeyNotFound == ((EtcdException)
e).errorCode) {
- configItemKeyedByName.put(key, Optional.empty());
- return;
+ @Override
+ public Optional<ConfigTable> readConfig(final Set<String> keys) {
+ ConfigTable table = new ConfigTable();
+ keys.forEach(e -> {
+ try {
+ GetResponse response = client.get(ByteSequence.from(e,
Charset.defaultCharset())).get();
+
+ if (0 == response.getCount()) {
+ table.add(new ConfigTable.ConfigItem(e, null));
+ } else {
+ response.getKvs().forEach(kv -> table.add(new
ConfigTable.ConfigItem(
+
kv.getKey().toString(Charset.defaultCharset()),
+
kv.getValue().toString(Charset.defaultCharset())
+ )
+ ));
}
+ } catch (Exception exp) {
+ throw new EtcdConfigException("Failed to read configuration",
exp);
}
- throw new EtcdConfigException("wait for value changed fail", e);
- }
+ });
+ return Optional.of(table);
}
- /**
- * get real key in etcd cluster which is removed "/${group}" from the key
retrive from etcd.
- */
- private String getRealKey(String key, String group) {
- int index = key.indexOf(group);
- if (index <= 0) {
- throw new RuntimeException("the group doesn't match");
- }
- String realKey = key.substring(index + group.length() + 1);
- return realKey;
- }
}
diff --git
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationProvider.java
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationProvider.java
index fef0c4e..77f2b1e 100644
---
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationProvider.java
+++
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationProvider.java
@@ -13,40 +13,29 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.skywalking.oap.server.configuration.etcd;
-import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
import
org.apache.skywalking.oap.server.configuration.api.AbstractConfigurationProvider;
import
org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * Get Configuration from etcd.
- */
+@Slf4j
public class EtcdConfigurationProvider extends AbstractConfigurationProvider {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdConfigurationProvider.class);
-
- private EtcdServerSettings settings;
+ private final EtcdServerSettings settings;
public EtcdConfigurationProvider() {
- settings = new EtcdServerSettings();
+ this.settings = new EtcdServerSettings();
}
@Override
protected ConfigWatcherRegister initConfigReader() throws
ModuleStartException {
- LOGGER.info("settings: {}", settings);
- if (Strings.isNullOrEmpty(settings.getServerAddr())) {
- throw new ModuleStartException("Etcd serverAddr cannot be null or
empty.");
- }
- if (Strings.isNullOrEmpty(settings.getGroup())) {
- throw new ModuleStartException("Etcd group cannot be null or
empty.");
+ if (StringUtil.isEmpty(settings.getEndpoints())) {
+ throw new ModuleStartException("Etcd endpoints cannot be null or
empty.");
}
try {
diff --git
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdServerSettings.java
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdServerSettings.java
index 17fb82a..053ce82 100644
---
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdServerSettings.java
+++
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdServerSettings.java
@@ -13,38 +13,41 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.skywalking.oap.server.configuration.etcd;
-import lombok.Getter;
-import lombok.Setter;
+import com.google.common.base.Strings;
+import java.util.Arrays;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-/**
- * entity wrapps the etcd cluster configuration.
- */
+@Data
@ToString
-@Getter
-@Setter
+@EqualsAndHashCode(callSuper = true)
public class EtcdServerSettings extends ModuleConfig {
+ private int period;
+ private String endpoints;
+ private String namespace;
+ private String authority;
+ private String user;
+ private String password;
- private String clusterName = "default";
- /**
- * etcd cluster address, like "10.10.10.1:2379,
10.10.10.2:2379,10.10.10.3.2379".
- */
- private String serverAddr;
-
- /**
- * directory for configuration
- */
- private String group;
+ private boolean authentication;
- /**
- * sec for interval refresh config data.
- */
- private int period = 60;
+ public String getNamespace() {
+ if (Strings.isNullOrEmpty(namespace)) {
+ return null;
+ }
+ if (!namespace.endsWith("/")) {
+ return namespace + "/";
+ }
+ return namespace;
+ }
+ public String[] getEndpointArray() {
+ return
Arrays.stream(endpoints.split("\\s*,\\s*")).toArray(String[]::new);
+ }
}
diff --git
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdUtils.java
b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdUtils.java
deleted file mode 100644
index 808b823..0000000
---
a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdUtils.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.configuration.etcd;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import org.apache.skywalking.oap.server.library.util.Address;
-import org.apache.skywalking.oap.server.library.util.ConnectUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * a util for etcd serverAddr parse.
- */
-public class EtcdUtils {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdUtils.class);
-
- public EtcdUtils() {
- }
-
- public static List<URI> parse(EtcdServerSettings settings) {
- List<URI> uris = new ArrayList<>();
- try {
- LOGGER.info("etcd settings is {}", settings);
- List<Address> addressList =
ConnectUtils.parse(settings.getServerAddr());
- for (Address address : addressList) {
- uris.add(new URI("http", null, address.getHost(),
address.getPort(), null, null, null));
- }
- } catch (Exception e) {
- throw new EtcdConfigException(e.getMessage(), e);
- }
-
- return uris;
- }
-
- public static List<URI> parseProp(Properties properties) {
- List<URI> uris = new ArrayList<>();
- try {
- LOGGER.info("etcd server addr is {}", properties);
- List<Address> addressList =
ConnectUtils.parse(properties.getProperty("serverAddr"));
- for (Address address : addressList) {
- uris.add(new URI("http", null, address.getHost(),
address.getPort(), null, null, null));
- }
- } catch (Exception e) {
- throw new EtcdConfigException(e.getMessage(), e);
- }
-
- return uris;
- }
-
-}
diff --git
a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
index c87f84f..b938c5d 100644
---
a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
+++
b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
@@ -57,7 +57,7 @@ public class EtcdConfigurationTestProvider extends
ModuleProvider {
private volatile String testValue;
@Override
- public void notify(ConfigChangeWatcher.ConfigChangeEvent value) {
+ public void notify(ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}",
value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
diff --git
a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
index fa638a8..b207209 100644
---
a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
+++
b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
@@ -18,99 +18,114 @@
package org.apache.skywalking.oap.server.configuration.etcd;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
import java.io.FileNotFoundException;
import java.io.Reader;
-import java.net.URI;
-import java.util.List;
+import java.nio.charset.Charset;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
-import mousio.etcd4j.EtcdClient;
-import mousio.etcd4j.promises.EtcdResponsePromise;
-import mousio.etcd4j.responses.EtcdKeysResponse;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import
org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
+import org.apache.skywalking.oap.server.library.module.ModuleConfigException;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleNotFoundException;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
import org.yaml.snakeyaml.Yaml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+@Slf4j
public class ITEtcdConfigurationTest {
+ @ClassRule
+ public static final GenericContainer CONTAINER =
+ new GenericContainer(DockerImageName.parse("bitnami/etcd:3.5.0"))
+ .waitingFor(Wait.forLogMessage(".*etcd setup finished!.*", 1))
+ .withEnv(Collections.singletonMap("ALLOW_NONE_AUTHENTICATION",
"yes"));
- private static final Logger LOGGER =
LoggerFactory.getLogger(ITEtcdConfigurationTest.class);
+ private static EtcdConfigurationTestProvider PROVIDER;
- private final Yaml yaml = new Yaml();
+ private static final String TEST_VALUE = "value";
- private EtcdServerSettings settings;
+ @BeforeClass
+ public static void beforeClass() throws FileNotFoundException,
ModuleConfigException, ModuleNotFoundException, ModuleStartException {
+ System.setProperty("etcd.endpoint", "http://127.0.0.1:" +
CONTAINER.getMappedPort(2379));
- private EtcdConfigurationTestProvider provider;
-
- private EtcdClient client;
-
- @Before
- public void setUp() throws Exception {
final ApplicationConfiguration applicationConfiguration = new
ApplicationConfiguration();
loadConfig(applicationConfiguration);
final ModuleManager moduleManager = new ModuleManager();
moduleManager.init(applicationConfiguration);
- final String etcdHost = System.getProperty("etcd.host");
- final String etcdPort = System.getProperty("etcd.port");
- LOGGER.info("etcdHost: {}, etcdPort: {}", etcdHost, etcdPort);
- Properties properties = new Properties();
- properties.setProperty("serverAddr", etcdHost + ":" + etcdPort);
-
- List<URI> uris = EtcdUtils.parseProp(properties);
- client = new EtcdClient(uris.toArray(new URI[] {}));
-
- provider = (EtcdConfigurationTestProvider)
moduleManager.find(EtcdConfigurationTestModule.NAME).provider();
+ PROVIDER = (EtcdConfigurationTestProvider)
moduleManager.find(EtcdConfigurationTestModule.NAME).provider();
- assertNotNull(provider);
+ assertNotNull(PROVIDER);
}
@Test(timeout = 20000)
public void shouldReadUpdated() throws Exception {
- assertNull(provider.watcher.value());
-
- assertTrue(publishConfig("test-module.default.testKey", "skywalking",
"500"));
-
- for (String v = provider.watcher.value(); v == null; v =
provider.watcher.value()) {
- LOGGER.info("value is : {}", provider.watcher.value());
+ assertNull(PROVIDER.watcher.value());
+
+ KV client = Client.builder()
+ .endpoints("http://localhost:" +
CONTAINER.getMappedPort(2379))
+ .namespace(ByteSequence.from("/skywalking/",
Charset.defaultCharset()))
+ .build()
+ .getKVClient();
+
+ client.put(
+ ByteSequence.from("test-module.default.testKey",
Charset.defaultCharset()),
+ ByteSequence.from(TEST_VALUE, Charset.defaultCharset())
+ ).get();
+
+ for (String v = PROVIDER.watcher.value(); v == null; v =
PROVIDER.watcher.value()) {
+ log.info("value is : {}", PROVIDER.watcher.value());
+ TimeUnit.MILLISECONDS.sleep(200L);
}
- assertEquals("500", provider.watcher.value());
+ assertEquals(TEST_VALUE, PROVIDER.watcher.value());
- assertTrue(removeConfig("test-module.default.testKey", "skywalking"));
+ client.delete(ByteSequence.from("test-module.default.testKey",
Charset.defaultCharset())).get();
- for (String v = provider.watcher.value(); v != null; v =
provider.watcher.value()) {
+ for (String v = PROVIDER.watcher.value(); v != null; v =
PROVIDER.watcher.value()) {
+ TimeUnit.MILLISECONDS.sleep(200L);
}
- assertNull(provider.watcher.value());
+ assertNull(PROVIDER.watcher.value());
}
@SuppressWarnings("unchecked")
- private void loadConfig(ApplicationConfiguration configuration) throws
FileNotFoundException {
+ private static void loadConfig(ApplicationConfiguration configuration)
throws FileNotFoundException {
+ final Yaml yaml = new Yaml();
+
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig =
yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
- ApplicationConfiguration.ModuleConfiguration
moduleConfiguration = configuration.addModule(moduleName);
+ ApplicationConfiguration.ModuleConfiguration
moduleConfiguration = configuration.addModule(
+ moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
- final Object replaceValue =
yaml.load(PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(value + "",
properties));
+ final Object replaceValue = yaml.load(
+
PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(value + "", properties));
if (replaceValue != null) {
properties.replace(key, replaceValue);
}
@@ -123,24 +138,8 @@ public class ITEtcdConfigurationTest {
}
}
- private boolean publishConfig(String key, String group, String value) {
- try {
- client.putDir(group).send().get();
- EtcdResponsePromise<EtcdKeysResponse> promise =
client.put(generateKey(key, group), value).send();
- promise.get();
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- private boolean removeConfig(String key, String group) throws Exception {
- client.delete(generateKey(key, group)).send().get();
- return true;
+ @AfterClass
+ public static void teardown() {
+ CONTAINER.close();
}
-
- private String generateKey(String key, String group) {
- return new
StringBuilder("/").append(group).append("/").append(key).toString();
- }
-
}
diff --git
a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/TestEtcdUtils.java
b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/TestEtcdUtils.java
deleted file mode 100644
index feea4cd..0000000
---
a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/TestEtcdUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.configuration.etcd;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestEtcdUtils {
-
- private EtcdServerSettings settings;
-
- private Properties properties;
-
- @Before
- public void setUp() {
- settings = new EtcdServerSettings();
- settings.setServerAddr("localhost:2379");
- properties = new Properties();
- properties.setProperty("serverAddr", "localhost:2379");
- }
-
- @Test
- public void testParse() {
- List<URI> list = EtcdUtils.parse(settings);
- Assert.assertEquals(1, list.size());
- URI uri = list.get(0);
- Assert.assertEquals("http", uri.getScheme());
- Assert.assertEquals("localhost", uri.getHost());
- Assert.assertEquals(2379, uri.getPort());
- }
-
- @Test
- public void testProp() {
- List<URI> list = EtcdUtils.parseProp(properties);
- Assert.assertEquals(1, list.size());
- URI uri = list.get(0);
- Assert.assertEquals("http", uri.getScheme());
- Assert.assertEquals("localhost", uri.getHost());
- Assert.assertEquals(2379, uri.getPort());
- }
-}
diff --git
a/oap-server/server-configuration/configuration-etcd/src/test/resources/application.yml
b/oap-server/server-configuration/configuration-etcd/src/test/resources/application.yml
index e4e1981..8cbe532 100755
---
a/oap-server/server-configuration/configuration-etcd/src/test/resources/application.yml
+++
b/oap-server/server-configuration/configuration-etcd/src/test/resources/application.yml
@@ -22,12 +22,8 @@ test-module:
configuration:
etcd:
# Etcd Server Host
- serverAddr: ${etcd.host}:${etcd.port}
- # Etcd Server Port
- port: ${etcd.port}
+ endpoints: ${etcd.endpoint}
# Etcd Configuration Group
- group: 'skywalking'
+ namespace: /skywalking/
# Unit seconds, sync period. Default fetch every 60 seconds.
period: 1
- # the name of current cluster, set the name if you want to upstream system
known.
- clusterName: "default"
diff --git a/oap-server/server-library/library-client/pom.xml
b/oap-server/server-library/library-client/pom.xml
index a712bf6..b39ba7b 100755
--- a/oap-server/server-library/library-client/pom.xml
+++ b/oap-server/server-library/library-client/pom.xml
@@ -48,6 +48,18 @@
<artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
diff --git a/oap-server/server-library/library-server/pom.xml
b/oap-server/server-library/library-server/pom.xml
index 25a3446..a67e47d 100644
--- a/oap-server/server-library/library-server/pom.xml
+++ b/oap-server/server-library/library-server/pom.xml
@@ -59,5 +59,21 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/tools/dependencies/known-oap-backend-dependencies-es7.txt
b/tools/dependencies/known-oap-backend-dependencies-es7.txt
index d412541..8e4b677 100755
--- a/tools/dependencies/known-oap-backend-dependencies-es7.txt
+++ b/tools/dependencies/known-oap-backend-dependencies-es7.txt
@@ -41,7 +41,7 @@ elasticsearch-rest-high-level-client-7.10.2.jar
elasticsearch-secure-sm-7.10.2.jar
elasticsearch-x-content-7.10.2.jar
error_prone_annotations-2.3.2.jar
-etcd4j-2.18.0.jar
+failsafe-2.3.4.jar
failureaccess-1.0.1.jar
flatbuffers-java-1.12.0.jar
freemarker-2.3.28.jar
@@ -51,6 +51,7 @@ groovy-3.0.3.jar
grpc-api-1.32.1.jar
grpc-context-1.32.1.jar
grpc-core-1.32.1.jar
+grpc-grpclb-1.32.1.jar
grpc-netty-1.32.1.jar
grpc-protobuf-1.32.1.jar
grpc-protobuf-lite-1.32.1.jar
@@ -81,6 +82,9 @@ javassist-3.25.0-GA.jar
javax.inject-1.jar
javax.servlet-api-3.1.0.jar
jcl-over-slf4j-1.7.30.jar
+jetcd-common-0.5.3.jar
+jetcd-core-0.5.3.jar
+jetcd-resolver-0.5.3.jar
jetty-http-9.4.40.v20210413.jar
jetty-io-9.4.40.v20210413.jar
jetty-security-9.4.40.v20210413.jar
@@ -92,7 +96,6 @@ jna-5.5.0.jar
joda-time-2.10.5.jar
jopt-simple-4.6.jar
jose4j-0.7.6.jar
-json-flattener-0.6.0.jar
jsr305-3.0.2.jar
kafka-clients-2.4.1.jar
kotlin-reflect-1.1.1.jar
@@ -120,7 +123,6 @@ lucene-spatial3d-8.7.0.jar
lucene-suggest-8.7.0.jar
lz4-java-1.6.0.jar
mapper-extras-client-7.10.2.jar
-minimal-json-0.9.5.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
mvel2-2.4.8.Final.jar
@@ -129,19 +131,17 @@ nacos-client-1.4.2.jar
nacos-common-1.4.2.jar
netty-buffer-4.1.65.Final.jar
netty-codec-4.1.65.Final.jar
-netty-codec-dns-4.1.65.Final.jar
netty-codec-http-4.1.65.Final.jar
-netty-codec-http2-4.1.51.Final.jar
-netty-codec-socks-4.1.51.Final.jar
+netty-codec-http2-4.1.65.Final.jar
+netty-codec-socks-4.1.65.Final.jar
netty-common-4.1.65.Final.jar
netty-handler-4.1.65.Final.jar
-netty-handler-proxy-4.1.51.Final.jar
+netty-handler-proxy-4.1.65.Final.jar
netty-resolver-4.1.65.Final.jar
-netty-resolver-dns-4.1.65.Final.jar
netty-tcnative-boringssl-static-2.0.39.Final.jar
netty-transport-4.1.65.Final.jar
-netty-transport-native-epoll-4.1.45.Final.jar
-netty-transport-native-unix-common-4.1.45.Final.jar
+netty-transport-native-epoll-4.1.65.Final.jar
+netty-transport-native-unix-common-4.1.65.Final.jar
okhttp-3.14.9.jar
okio-1.17.2.jar
parent-join-client-7.10.2.jar
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt
b/tools/dependencies/known-oap-backend-dependencies.txt
index a8b0cbf..16d38cd 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -39,7 +39,7 @@ elasticsearch-rest-high-level-client-6.3.2.jar
elasticsearch-secure-sm-6.3.2.jar
elasticsearch-x-content-6.3.2.jar
error_prone_annotations-2.3.2.jar
-etcd4j-2.18.0.jar
+failsafe-2.3.4.jar
failureaccess-1.0.1.jar
flatbuffers-java-1.12.0.jar
freemarker-2.3.28.jar
@@ -49,6 +49,7 @@ groovy-3.0.3.jar
grpc-api-1.32.1.jar
grpc-context-1.32.1.jar
grpc-core-1.32.1.jar
+grpc-grpclb-1.32.1.jar
grpc-netty-1.32.1.jar
grpc-protobuf-1.32.1.jar
grpc-protobuf-lite-1.32.1.jar
@@ -79,6 +80,9 @@ javassist-3.25.0-GA.jar
javax.inject-1.jar
javax.servlet-api-3.1.0.jar
jcl-over-slf4j-1.7.30.jar
+jetcd-common-0.5.3.jar
+jetcd-core-0.5.3.jar
+jetcd-resolver-0.5.3.jar
jetty-http-9.4.40.v20210413.jar
jetty-io-9.4.40.v20210413.jar
jetty-security-9.4.40.v20210413.jar
@@ -90,7 +94,6 @@ jna-4.5.1.jar
joda-time-2.10.5.jar
jopt-simple-4.6.jar
jose4j-0.7.6.jar
-json-flattener-0.6.0.jar
jsr305-3.0.2.jar
kafka-clients-2.4.1.jar
kotlin-reflect-1.1.1.jar
@@ -117,7 +120,6 @@ lucene-spatial-extras-7.3.1.jar
lucene-spatial3d-7.3.1.jar
lucene-suggest-7.3.1.jar
lz4-java-1.6.0.jar
-minimal-json-0.9.5.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
mvel2-2.4.8.Final.jar
@@ -126,19 +128,17 @@ nacos-client-1.4.2.jar
nacos-common-1.4.2.jar
netty-buffer-4.1.65.Final.jar
netty-codec-4.1.65.Final.jar
-netty-codec-dns-4.1.65.Final.jar
netty-codec-http-4.1.65.Final.jar
-netty-codec-http2-4.1.51.Final.jar
-netty-codec-socks-4.1.51.Final.jar
+netty-codec-http2-4.1.65.Final.jar
+netty-codec-socks-4.1.65.Final.jar
netty-common-4.1.65.Final.jar
netty-handler-4.1.65.Final.jar
-netty-handler-proxy-4.1.51.Final.jar
+netty-handler-proxy-4.1.65.Final.jar
netty-resolver-4.1.65.Final.jar
-netty-resolver-dns-4.1.65.Final.jar
netty-tcnative-boringssl-static-2.0.39.Final.jar
netty-transport-4.1.65.Final.jar
-netty-transport-native-epoll-4.1.45.Final.jar
-netty-transport-native-unix-common-4.1.45.Final.jar
+netty-transport-native-epoll-4.1.65.Final.jar
+netty-transport-native-unix-common-4.1.65.Final.jar
okhttp-3.14.9.jar
okio-1.17.2.jar
parent-join-client-6.3.2.jar