kezhenxu94 commented on a change in pull request #7215:
URL: https://github.com/apache/skywalking/pull/7215#discussion_r663321251



##########
File path: 
oap-server/server-cluster-plugin/cluster-etcd-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/etcd/EtcdCoordinator.java
##########
@@ -97,54 +126,62 @@ public EtcdCoordinator(final ModuleDefineHolder manager, 
final ClusterModuleEtcd
 
     @Override
     public void registerRemote(RemoteInstance remoteInstance) throws 
ServiceRegisterException {
-
         if (needUsingInternalAddr()) {
-            remoteInstance = new RemoteInstance(new 
Address(config.getInternalComHost(), config.getInternalComPort(), true));
+            remoteInstance = new RemoteInstance(
+                new Address(config.getInternalComHost(), 
config.getInternalComPort(), true));
         }
 
         this.selfAddress = remoteInstance.getAddress();
-
-        EtcdEndpoint endpoint = new 
EtcdEndpoint.Builder().serviceName(serviceName)
-                                                          
.host(selfAddress.getHost())
-                                                          
.port(selfAddress.getPort())
-                                                          .build();
+        final EtcdEndpoint endpoint = new 
EtcdEndpoint.Builder().serviceName(serviceName)
+                                                                
.host(selfAddress.getHost())
+                                                                
.port(selfAddress.getPort())
+                                                                .build();
         try {
             initHealthChecker();
-            client.putDir(serviceName).send();
-            String key = buildKey(serviceName, selfAddress, remoteInstance);
-            String json = new Gson().toJson(endpoint);
-            EtcdResponsePromise<EtcdKeysResponse> promise = client.put(key, 
json).ttl(KEY_TTL).send();
-            //check register.
-            promise.get();
-            renew(client, key, json);
+
+            final Lease leaseClient = client.getLeaseClient();
+            final long leaseID = leaseClient.grant(30L).get().getID();
+
+            ByteSequence instance = ByteSequence.from(GSON.toJson(endpoint), 
Charset.defaultCharset());
+            client.getKVClient()
+                  .put(
+                      buildKey(serviceName, selfAddress, remoteInstance),
+                      instance,
+                      PutOption.newBuilder().withLeaseId(leaseID).build()
+                  )
+                  .get();
             healthChecker.health();
+
+            client.getLeaseClient().keepAlive(leaseID, new 
StreamObserver<LeaseKeepAliveResponse>() {
+                @Override
+                public void onNext(final LeaseKeepAliveResponse response) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Refresh lease id = {}, ttl = {}", 
response.getID(), response.getTTL());
+                    }
+                }
+
+                @Override
+                public void onError(final Throwable throwable) {
+                    log.error("", throwable);

Review comment:
       Add some informative message so we can distinguish this in massive logs, 
also, do we need to mark it as unhealthy here?
   
   ```suggestion
                       log.error("Failed to keep alive in Etcd coordinator", 
throwable);
                       healthChecker.unHealth(throwable);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to