Caideyipi commented on code in PR #12275:
URL: https://github.com/apache/iotdb/pull/12275#discussion_r1549391991


##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java:
##########
@@ -71,12 +87,23 @@ public String getConsumerGroupId() {
   /////////////////////////////// ctor ///////////////////////////////
 
   protected SubscriptionConsumer(Builder builder) {
-    this.defaultEndPoint = new TEndPoint(builder.host, builder.port);
+    this.initialEndpoints = new ArrayList<>();
+    // From org.apache.iotdb.session.Session.getNodeUrls
+    // Priority is given to `host:port` over `nodeUrls`.
+    if (Objects.nonNull(builder.host)) {
+      initialEndpoints.add(new TEndPoint(builder.host, builder.port));
+    } else {
+      
initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
+    }
+
     this.username = builder.username;
     this.password = builder.password;
 
     this.consumerId = builder.consumerId;
     this.consumerGroupId = builder.consumerGroupId;
+
+    this.heartbeatInterval = builder.heartbeatInterval;
+    this.endpointsSyncInterval = builder.endpointsSyncInterval;
   }
 
   protected SubscriptionConsumer(Builder builder, Properties config) {

Review Comment:
   Seemingly the Properties is only named as "config" in the subscription 
module, in other modules they are all named "properties"..



##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java:
##########
@@ -25,12 +25,24 @@ public class ConsumerConstant {
 
   public static final String HOST_KEY = "host";
   public static final String PORT_KEY = "port";
+  public static final String NODE_URLS_KEY = "node-urls";
+
   public static final String USERNAME_KEY = "username";
   public static final String PASSWORD_KEY = "password";
 
   public static final String CONSUMER_ID_KEY = "consumer-id";
   public static final String CONSUMER_GROUP_ID_KEY = "group-id";
 
+  /////////////////////////////// hidden ///////////////////////////////
+
+  public static final String HEARTBEAT_INTERVAL_KEY = "heartbeat-interval"; // 
unit: ms

Review Comment:
   Better add unit "ms" to the variables and strings to be consistent of other 
plugin constants.



##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class SubscriptionEndpointsSyncer implements Runnable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionEndpointsSyncer.class);
+
+  private final SubscriptionConsumer consumer;
+
+  public SubscriptionEndpointsSyncer(SubscriptionConsumer consumer) {
+    this.consumer = consumer;
+  }
+
+  @Override
+  public void run() {
+    if (consumer.isClosed()) {
+      return;
+    }
+
+    consumer.acquireWriteLock();
+    try {
+      syncInternal();
+    } finally {
+      consumer.releaseWriteLock();
+    }
+  }
+
+  private void syncInternal() {
+    if (consumer.hasNoProviders()) {
+      try {
+        consumer.openProviders();
+      } catch (final IoTDBConnectionException e) {
+        LOGGER.warn("something unexpected happened when syncing subscription 
endpoints...", e);
+        return;
+      }
+    }
+
+    final Map<Integer, TEndPoint> allEndPoints;
+    try {
+      allEndPoints = consumer.fetchAllEndPointsWithRedirection();
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to fetch all endpoints, exception: {}, will retry later...", 
e.getMessage());
+      return; // retry later
+    }
+
+    // add new providers or handshake existed providers
+    for (final Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
+      final SubscriptionProvider provider = 
consumer.getProvider(entry.getKey());
+      if (Objects.isNull(provider)) {
+        // new provider
+        final SubscriptionProvider newProvider = 
consumer.constructProvider(entry.getValue());
+        try {
+          newProvider.handshake();
+        } catch (final Exception e) {
+          LOGGER.warn(
+              "Failed to create connection with subscription provider {}, 
exception: {}, will retry later...",
+              newProvider,
+              e.getMessage());
+          continue; // retry later
+        }
+        consumer.addProvider(entry.getKey(), newProvider);
+      } else {
+        // existed provider

Review Comment:
   Existing is more precise



##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java:
##########
@@ -199,34 +269,216 @@ private void launchHeartbeatWorker() {
               return t;
             });
     heartbeatWorkerExecutor.scheduleAtFixedRate(
-        new ConsumerHeartbeatWorker(this), 0, HEARTBEAT_INTERVAL, 
TimeUnit.MILLISECONDS);
+        new ConsumerHeartbeatWorker(this), 0, heartbeatInterval, 
TimeUnit.MILLISECONDS);
   }
 
   private void shutdownHeartbeatWorker() {
     heartbeatWorkerExecutor.shutdown();
     heartbeatWorkerExecutor = null;
   }
 
-  boolean isClosed() {
-    return isClosed.get();
+  /////////////////////////////// endpoints syncer 
///////////////////////////////
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  private void launchEndpointsSyncer() {
+    endpointsSyncerExecutor =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread t =
+                  new Thread(
+                      Thread.currentThread().getThreadGroup(), r, 
"SubscriptionEndpointsSyncer", 0);
+              if (!t.isDaemon()) {
+                t.setDaemon(true);
+              }
+              if (t.getPriority() != Thread.NORM_PRIORITY) {
+                t.setPriority(Thread.NORM_PRIORITY);
+              }
+              return t;
+            });
+    endpointsSyncerExecutor.scheduleAtFixedRate(
+        new SubscriptionEndpointsSyncer(this), 0, endpointsSyncInterval, 
TimeUnit.MILLISECONDS);
   }
 
-  /////////////////////////////// utility ///////////////////////////////
+  private void shutdownEndpointsSyncer() {
+    endpointsSyncerExecutor.shutdown();
+    endpointsSyncerExecutor = null;
+  }
 
-  private SubscriptionSessionConnection getDefaultSessionConnection() {
-    return defaultSubscriptionProvider.getSessionConnection();
+  /////////////////////////////// subscription provider 
///////////////////////////////
+
+  SubscriptionProvider constructProvider(final TEndPoint endPoint) {
+    return new SubscriptionProvider(
+        endPoint, this.username, this.password, this.consumerId, 
this.consumerGroupId);
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireWriteLock()}. */
+  void openProviders() throws IoTDBConnectionException {
+    // close stale providers
+    closeProviders();
+
+    for (final TEndPoint endPoint : initialEndpoints) {
+      final SubscriptionProvider defaultProvider;
+      final int defaultDataNodeId;
+
+      try {
+        defaultProvider = constructProvider(endPoint);
+        defaultDataNodeId = defaultProvider.handshake();
+      } catch (final Exception e) {
+        LOGGER.warn("Failed to create connection with {}, exception: {}", 
endPoint, e.getMessage());
+        continue; // try next endpoint
+      }
+      addProvider(defaultDataNodeId, defaultProvider);
+
+      final Map<Integer, TEndPoint> allEndPoints;
+      try {
+        allEndPoints = 
defaultProvider.getSessionConnection().fetchAllEndPoints();
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to fetch all endpoints from {}, exception: {}, will retry 
later...",
+            endPoint,
+            e.getMessage());
+        break; // retry later
+      }
+
+      for (final Map.Entry<Integer, TEndPoint> entry : 
allEndPoints.entrySet()) {
+        if (defaultDataNodeId == entry.getKey()) {
+          continue;
+        }
+
+        final SubscriptionProvider provider;
+        try {
+          provider = constructProvider(entry.getValue());
+          provider.handshake();
+        } catch (final Exception e) {
+          LOGGER.warn(
+              "Failed to create connection with {}, exception: {}, will retry 
later...",
+              entry.getValue(),
+              e.getMessage());
+          continue; // retry later
+        }
+        addProvider(entry.getKey(), provider);
+      }
+
+      break;
+    }
+
+    if (hasNoProviders()) {
+      throw NO_PROVIDERS_EXCEPTION;
+    }
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireWriteLock()}. */
+  private void closeProviders() throws IoTDBConnectionException {
+    for (final SubscriptionProvider provider : getAllProviders()) {
+      provider.close();
+    }
+    subscriptionProviders.clear();
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireWriteLock()}. */
+  void addProvider(final int dataNodeId, final SubscriptionProvider provider) {
+    // the subscription provider is opened
+    LOGGER.info("add new subscription provider {}", provider);
+    subscriptionProviders.put(dataNodeId, provider);
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireWriteLock()}. */
+  void closeAndRemoveProvider(final int dataNodeId) throws 
IoTDBConnectionException {
+    if (!containsProvider(dataNodeId)) {
+      return;
+    }
+    final SubscriptionProvider provider = 
subscriptionProviders.get(dataNodeId);
+    try {
+      provider.close();
+    } finally {
+      LOGGER.info("close and remove stale subscription provider {}", provider);
+      subscriptionProviders.remove(dataNodeId);
+    }
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  boolean hasNoProviders() {
+    return subscriptionProviders.isEmpty();
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  boolean containsProvider(final int dataNodeId) {
+    return subscriptionProviders.containsKey(dataNodeId);
   }
 
-  protected List<SubscriptionSessionConnection> getSessionConnections() {
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  List<SubscriptionProvider> getAllAvailableProviders() {
     return subscriptionProviders.values().stream()
-        .map(SubscriptionProvider::getSessionConnection)
+        .filter(SubscriptionProvider::isAvailable)
         .collect(Collectors.toList());
   }
 
-  protected SubscriptionSessionConnection getSessionConnection(int dataNodeId) 
{
-    return subscriptionProviders
-        .getOrDefault(dataNodeId, defaultSubscriptionProvider)
-        .getSessionConnection();
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  List<SubscriptionProvider> getAllProviders() {
+    return new ArrayList<>(subscriptionProviders.values());
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  SubscriptionProvider getProvider(final int dataNodeId) {
+    return containsProvider(dataNodeId) ? 
subscriptionProviders.get(dataNodeId) : null;
+  }
+
+  /////////////////////////////// redirection ///////////////////////////////
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  public void subscribeWithRedirection(final Set<String> topicNames)
+      throws IoTDBConnectionException {
+    for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+      try {
+        provider.getSessionConnection().subscribe(topicNames);
+        return;
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to subscribe topics {} from subscription provider {}, 
exception: {}, try next subscription provider...",
+            topicNames,
+            provider,
+            e.getMessage());
+      }
+    }
+    throw NO_PROVIDERS_EXCEPTION;
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  public void unsubscribeWithRedirection(final Set<String> topicNames)
+      throws IoTDBConnectionException {
+    for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+      try {
+        provider.getSessionConnection().unsubscribe(topicNames);
+        return;
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to unsubscribe topics {} from subscription provider {}, 
exception: {}, try next subscription provider...",
+            topicNames,
+            provider,
+            e.getMessage());
+      }
+    }
+    throw NO_PROVIDERS_EXCEPTION;
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireReadLock()}. */
+  public Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection()
+      throws IoTDBConnectionException {
+    Map<Integer, TEndPoint> endPoints = null;
+    for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+      try {
+        endPoints = provider.getSessionConnection().fetchAllEndPoints();

Review Comment:
   Shall the first fetched "endPoints" break the loop directly?



##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java:
##########
@@ -55,15 +56,17 @@ public SubscriptionSession(String host, int port, String 
username, String passwo
             .username(username)
             .password(password)
             // disable auto fetch
-            .enableAutoFetch(false));
+            .enableAutoFetch(false)
+            // disable redirection
+            .enableRedirection(false));
   }
 
   @Override
   public SessionConnection constructSessionConnection(
       Session session, TEndPoint endpoint, ZoneId zoneId) throws 
IoTDBConnectionException {
-    if (endpoint == null) {
-      return new SubscriptionSessionConnection(
-          session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
+    if (Objects.isNull(endpoint)) {
+      throw new IllegalStateException(
+          "subscription session must be configured with an endpoint...");

Review Comment:
   May be one "." is enough? And PipeParameterNotValidException is maybe better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to