CRZbulabula commented on code in PR #16647:
URL: https://github.com/apache/iotdb/pull/16647#discussion_r2531004539


##########
integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java:
##########
@@ -84,7 +84,7 @@ public void initClusterEnvironment() {
     }
     clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(new 
ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+            .createClientManager(new 
ConfigNodeClientManager.ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());

Review Comment:
   Revert this change.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/iotv2/container/IoTV2GlobalComponentContainer.java:
##########
@@ -20,8 +20,7 @@
 package org.apache.iotdb.commons.consensus.iotv2.container;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import 
org.apache.iotdb.commons.client.ClientPoolFactory.AsyncPipeConsensusServiceClientPoolFactory;
-import 
org.apache.iotdb.commons.client.ClientPoolFactory.SyncPipeConsensusServiceClientPoolFactory;
+import org.apache.iotdb.commons.client.ClientPoolFactory;

Review Comment:
   Revert changes in this file.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java:
##########
@@ -649,6 +644,48 @@ public TSStatus operatePermission(final TAuthorizerReq 
req) {
             req.getNewUsername()));
   }
 
+  @Override
+  public TGetAINodeLocationResp getAINodeLocation() throws TException {
+    final TGetAINodeLocationResp resp = new TGetAINodeLocationResp();
+    final TSStatus status = new TSStatus();
+    try {
+      final List<TAINodeConfiguration> registeredAINodes =
+          configManager.getNodeManager().getRegisteredAINodes();
+
+      if (registeredAINodes == null || registeredAINodes.isEmpty()) {
+        
status.setCode(TSStatusCode.NO_REGISTERED_AI_NODE_ERROR.getStatusCode());
+        status.setMessage("No registered AINode found");
+        resp.setStatus(status);
+        return resp;
+      }
+
+      final TAINodeConfiguration cfg = registeredAINodes.get(0);
+      final TAINodeLocation loc = (cfg == null) ? null : cfg.getLocation();
+
+      boolean hasEndpoint = false;

Review Comment:
   No need to further judge here. The location is absolutely exist.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -374,6 +376,9 @@
 
 public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
 
+  // NOTE: AINode location is now maintained globally inside AINodeClient.
+  // We only resolve via ConfigNode when needed, then publish it back to 
AINodeClient.
+

Review Comment:
   Remove them.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -3606,20 +3611,19 @@ public SettableFuture<ConfigTaskResult> dropModel(final 
String modelId) {
   @Override
   public SettableFuture<ConfigTaskResult> showModels(final String modelId) {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    try (final ConfigNodeClient client =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TShowModelReq req = new TShowModelReq();
+    final TEndPoint ep = AINodeClient.getCurrentEndpoint();
+    try (final AINodeClient ai = 
AINodeClientManager.getInstance().borrowClient(ep)) {

Review Comment:
   It is still not a perfect implementation. Refer to 
CONFIGNODE_CLIENT_MANAGER, you can put a constant in AINodeClientManager, such 
that it is more convenient for others to borrow a AINode Client like:
   ```
   
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)
   ```



##########
integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java:
##########
@@ -181,7 +181,7 @@ protected void initEnvironment(
 
     clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(new 
ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+            .createClientManager(new 
ConfigNodeClientManager.ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());

Review Comment:
   Revert this change.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java:
##########
@@ -86,18 +93,99 @@ public class AINodeClient implements AutoCloseable, 
ThriftClient {
 
   public static final String MSG_CONNECTION_FAIL =
       "Fail to connect to AINode. Please check status of AINode";
+  private static final int MAX_RETRY = 3;
+
+  @FunctionalInterface
+  private interface RemoteCall<R> {
+    R apply(IAINodeRPCService.Client c) throws TException;
+  }
 
   private final TsBlockSerde tsBlockSerde = new TsBlockSerde();
 
   ClientManager<TEndPoint, AINodeClient> clientManager;
 
+  private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
+      ConfigNodeClientManager.getInstance();
+
+  private static final 
java.util.concurrent.atomic.AtomicReference<TAINodeLocation>
+      CURRENT_LOCATION = new java.util.concurrent.atomic.AtomicReference<>();
+
+  public static TEndPoint getCurrentEndpoint() {
+    TAINodeLocation loc = CURRENT_LOCATION.get();
+    if (loc == null) {
+      loc = refreshFromConfigNode();
+    }
+    return (loc == null) ? null : pickEndpointFrom(loc);
+  }
+
+  public static void updateGlobalAINodeLocation(final TAINodeLocation loc) {
+    if (loc != null) {
+      CURRENT_LOCATION.set(loc);
+    }
+  }
+
+  private <R> R executeRemoteCallWithRetry(RemoteCall<R> call) throws 
TException {
+    TException last = null;
+    for (int attempt = 1; attempt <= MAX_RETRY; attempt++) {
+      try {
+        if (transport == null || !transport.isOpen()) {
+          final TEndPoint ep = getCurrentEndpoint();
+          if (ep == null) {
+            throw new TException("AINode endpoint unavailable");
+          }
+          this.endPoint = ep;
+          init();
+        }
+        return call.apply(client);
+      } catch (TException e) {
+        last = e;
+        invalidate();
+        final TAINodeLocation loc = refreshFromConfigNode();
+        if (loc != null) {
+          this.endPoint = pickEndpointFrom(loc);
+        }
+        try {
+          Thread.sleep(100L * attempt);

Review Comment:
   Better to sleep longer
   ```suggestion
             Thread.sleep(1000L * attempt);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java:
##########
@@ -86,18 +93,99 @@ public class AINodeClient implements AutoCloseable, 
ThriftClient {
 
   public static final String MSG_CONNECTION_FAIL =
       "Fail to connect to AINode. Please check status of AINode";
+  private static final int MAX_RETRY = 3;
+
+  @FunctionalInterface
+  private interface RemoteCall<R> {
+    R apply(IAINodeRPCService.Client c) throws TException;
+  }
 
   private final TsBlockSerde tsBlockSerde = new TsBlockSerde();
 
   ClientManager<TEndPoint, AINodeClient> clientManager;
 
+  private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
+      ConfigNodeClientManager.getInstance();
+
+  private static final 
java.util.concurrent.atomic.AtomicReference<TAINodeLocation>
+      CURRENT_LOCATION = new java.util.concurrent.atomic.AtomicReference<>();

Review Comment:
   ```suggestion
     private static final AtomicReference<TAINodeLocation>
         CURRENT_LOCATION = AtomicReference<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to