This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 3895f4f79 KUDU-3526 [java] Scanner should bind with a tserver in java 
client.
3895f4f79 is described below

commit 3895f4f796579071615addfa12d2abf75753b9a1
Author: 宋家成 <songjiach...@thinkingdata.cn>
AuthorDate: Mon Nov 20 16:49:22 2023 +0800

    KUDU-3526 [java] Scanner should bind with a tserver in java client.
    
    For now, scan requests sent by the java client might fail with scanner
    not found error. It is because that scanner does not bind with the
    tserver with which it first communicates.
    
    The code in method scanNextRows of java client is still trying to
    search for the tserver by the locations and selection policy. So if
    the leader is changed and the next scan request is sent to the new
    leader, the tserver will respond with the "scanner not found"
    exception.
    
    We should bind the scanner to the tserver, similar to how it is done
    in the C++ client.
    
    Change-Id: I9cf65f4215e99198dd41b43d14e50c8c23b8a9b2
    Reviewed-on: http://gerrit.cloudera.org:8080/20715
    Tested-by: Kudu Jenkins
    Reviewed-by: Yifan Zhang <chinazhangyi...@163.com>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 11 +--
 .../org/apache/kudu/client/AsyncKuduScanner.java   | 12 +++
 .../java/org/apache/kudu/client/RemoteTablet.java  | 11 +++
 .../org/apache/kudu/client/TestKuduScanner.java    | 89 ++++++++++++++++++++++
 4 files changed, 116 insertions(+), 7 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index b9ada6de4..84ffedf97 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1325,8 +1325,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
     nextRequest.attempt++;
-    final ServerInfo info = 
tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection(),
-                                                                location);
+    final ServerInfo info = tablet.getTabletServerByUuid(scanner.getTsUUID());
     if (info == null) {
       return delayedSendRpcToTablet(nextRequest, new 
RecoverableException(Status.RemoteError(
           String.format("No information on servers hosting tablet %s, will 
retry later",
@@ -1352,8 +1351,7 @@ public class AsyncKuduClient implements AutoCloseable {
       return Deferred.fromResult(null);
     }
     final KuduRpc<AsyncKuduScanner.Response> closeRequest = 
scanner.getCloseRequest();
-    final ServerInfo info = 
tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection(),
-                                                                location);
+    final ServerInfo info = tablet.getTabletServerByUuid(scanner.getTsUUID());
     if (info == null) {
       return Deferred.fromResult(null);
     }
@@ -1381,8 +1379,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     final KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest();
-    final ServerInfo info =
-        
tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection(), 
location);
+    final ServerInfo info = tablet.getTabletServerByUuid(scanner.getTsUUID());
     if (info == null) {
       return Deferred.fromResult(null);
     }
@@ -1436,7 +1433,7 @@ public class AsyncKuduClient implements AutoCloseable {
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
       ServerInfo info = 
tablet.getReplicaSelectedServerInfo(request.getReplicaSelection(),
-                                                            location);
+                                                            
getLocationString());
       if (info != null) {
         Deferred<R> d = request.getDeferred();
         request.setTablet(tablet);
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index a688224ab..fd3930e33 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -287,6 +287,12 @@ public final class AsyncKuduScanner {
 
   private String queryId;
 
+  /**
+   * UUID of the tserver which the scanner is bound with. The following scans 
of
+   * this scanner will be sent to the tserver.
+   */
+  private String tsUUID;
+
   /**
    * The prefetching result is cached in memory. This atomic reference is used 
to avoid
    * two concurrent prefetchings occur and the latest one overrides the 
previous one.
@@ -565,6 +571,10 @@ public final class AsyncKuduScanner {
     this.rowDataFormat = rowDataFormat;
   }
 
+  public String getTsUUID() {
+    return tsUUID;
+  }
+
   /**
    * Scans a number of rows.
    * <p>
@@ -623,12 +633,14 @@ public final class AsyncKuduScanner {
           }
 
           if (!resp.more || resp.scannerId == null) {
+            tsUUID = resp.data.getTsUUID();
             scanFinished();
             return Deferred.fromResult(resp.data); // there might be data to 
return
           }
           scannerId = resp.scannerId;
           sequenceId++;
           canRequestMore = resp.more;
+          tsUUID = resp.data.getTsUUID();
           if (LOG.isDebugEnabled()) {
             LOG.debug("Scanner {} opened on {}", Bytes.pretty(scannerId), 
tablet);
           }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 753093729..2c035df74 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -302,6 +302,17 @@ public class RemoteTablet implements 
Comparable<RemoteTablet> {
     return results;
   }
 
+  /**
+   * Get information on tablet server by its UUID.
+   * @param uuid tablet server uuid.
+   * @return tablet server info by the uuid.
+   */
+  ServerInfo getTabletServerByUuid(String uuid) {
+    synchronized (tabletServers) {
+      return tabletServers.get(uuid);
+    }
+  }
+
   @Override
   public int compareTo(RemoteTablet remoteTablet) {
     if (remoteTablet == null) {
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
index 8ea7a1e08..9b4e86659 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
@@ -17,10 +17,13 @@
 
 package org.apache.kudu.client;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.kudu.client.AsyncKuduScanner.DEFAULT_IS_DELETED_COL_NAME;
+import static org.apache.kudu.test.ClientTestUtil.createManyStringsSchema;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
 import static org.apache.kudu.test.ClientTestUtil.loadDefaultTable;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -38,6 +41,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,6 +58,7 @@ import org.apache.kudu.test.CapturingLogAppender;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.RandomUtils;
 import org.apache.kudu.test.cluster.KuduBinaryLocator;
+import org.apache.kudu.test.junit.AssertHelpers;
 import org.apache.kudu.util.DataGenerator;
 import org.apache.kudu.util.Pair;
 
@@ -600,4 +605,88 @@ public class TestKuduScanner {
     assertTrue(row.hasIsDeleted());
     assertTrue(row.isDeleted());
   }
+
+  @Test
+  public void testScannerLeaderChanged() throws Exception {
+    // Prepare the table for testing.
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    final int buckets = 2;
+    createOptions.addHashPartitions(ImmutableList.of("key"), buckets);
+    createOptions.setNumReplicas(3);
+    client.createTable(tableName, schema, createOptions);
+
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(tableName);
+    final int totalRows = 2000;
+    for (int i = 0; i < totalRows; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", String.format("key_%02d", i));
+      row.addString("c1", "c1_" + i);
+      row.addString("c2", "c2_" + i);
+      assertEquals(session.apply(insert).hasRowError(), false);
+    }
+    AsyncKuduClient asyncClient = harness.getAsyncClient();
+    KuduScanner kuduScanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+            .replicaSelection(ReplicaSelection.LEADER_ONLY)
+            .batchSizeBytes(100)
+            .build();
+
+    // Open the scanner first.
+    kuduScanner.nextRows();
+    final HostAndPort referenceServerHostPort = harness.findLeaderTabletServer(
+            new LocatedTablet(kuduScanner.currentTablet()));
+    final String referenceTabletId = kuduScanner.currentTablet().getTabletId();
+
+    // Send LeaderStepDown request.
+    KuduBinaryLocator.ExecutableInfo exeInfo = 
KuduBinaryLocator.findBinary("kudu");
+    LOG.info(harness.getMasterAddressesAsString());
+    List<String> commandLine = Lists.newArrayList(exeInfo.exePath(),
+            "tablet",
+            "leader_step_down",
+            harness.findLeaderMasterServer().toString(),
+            kuduScanner.currentTablet().getTabletId());
+    ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
+    processBuilder.environment().putAll(exeInfo.environment());
+    Process stepDownProcess = processBuilder.start();
+    assertEquals(0, stepDownProcess.waitFor());
+
+    // Wait until the leader changes.
+    assertEventuallyTrue(
+        "The leadership should be transferred",
+        new AssertHelpers.BooleanExpression() {
+          @Override
+          public boolean get() throws Exception {
+            asyncClient.emptyTabletsCacheForTable(table.getTableId());
+            List<LocatedTablet> tablets = table.getTabletsLocations(50000);
+            LocatedTablet targetTablet = null;
+            for (LocatedTablet tablet : tablets) {
+              String tabletId = new String(tablet.getTabletId(), UTF_8);
+              if (tabletId.equals(referenceTabletId)) {
+                targetTablet = tablet;
+              }
+            }
+            HostAndPort targetHp = 
harness.findLeaderTabletServer(targetTablet);
+            return !targetHp.equals(referenceServerHostPort);
+          }
+        },
+        10000/*timeoutMillis*/);
+
+    // Simulate that another request(like Batch) has sent to the wrong leader 
tablet server and
+    // the change of leadership has been acknowledged. The response will 
demote the leader.
+    kuduScanner.currentTablet().demoteLeader(
+            kuduScanner.currentTablet().getLeaderServerInfo().getUuid());
+    asyncClient.emptyTabletsCacheForTable(table.getTableId());
+
+    int rowsScannedInNextScans = 0;
+    try {
+      while (kuduScanner.hasMoreRows()) {
+        rowsScannedInNextScans += kuduScanner.nextRows().numRows;
+      }
+    } catch (Exception ex) {
+      assertFalse(ex.getMessage().matches(".*Scanner .* not found.*"));
+    }
+    assertTrue(rowsScannedInNextScans > 0);
+  }
 }

Reply via email to