This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 3895f4f79 KUDU-3526 [java] Scanner should bind with a tserver in java client. 3895f4f79 is described below commit 3895f4f796579071615addfa12d2abf75753b9a1 Author: 宋家成 <songjiach...@thinkingdata.cn> AuthorDate: Mon Nov 20 16:49:22 2023 +0800 KUDU-3526 [java] Scanner should bind with a tserver in java client. For now, scan requests sent by the java client might fail with scanner not found error. It is because that scanner does not bind with the tserver with which it first communicates. The code in method scanNextRows of java client is still trying to search for the tserver by the locations and selection policy. So if the leader is changed and the next scan request is sent to the new leader, the tserver will respond with the "scanner not found" exception. We should bind the scanner to the tserver, similar to how it is done in the C++ client. Change-Id: I9cf65f4215e99198dd41b43d14e50c8c23b8a9b2 Reviewed-on: http://gerrit.cloudera.org:8080/20715 Tested-by: Kudu Jenkins Reviewed-by: Yifan Zhang <chinazhangyi...@163.com> Reviewed-by: Alexey Serbin <ale...@apache.org> --- .../org/apache/kudu/client/AsyncKuduClient.java | 11 +-- .../org/apache/kudu/client/AsyncKuduScanner.java | 12 +++ .../java/org/apache/kudu/client/RemoteTablet.java | 11 +++ .../org/apache/kudu/client/TestKuduScanner.java | 89 ++++++++++++++++++++++ 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index b9ada6de4..84ffedf97 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -1325,8 +1325,7 @@ public class AsyncKuduClient implements AutoCloseable { // Important to increment the attempts before the next if statement since // getSleepTimeForRpc() relies on it if the client is null or dead. nextRequest.attempt++; - final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection(), - location); + final ServerInfo info = tablet.getTabletServerByUuid(scanner.getTsUUID()); if (info == null) { return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError( String.format("No information on servers hosting tablet %s, will retry later", @@ -1352,8 +1351,7 @@ public class AsyncKuduClient implements AutoCloseable { return Deferred.fromResult(null); } final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest(); - final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection(), - location); + final ServerInfo info = tablet.getTabletServerByUuid(scanner.getTsUUID()); if (info == null) { return Deferred.fromResult(null); } @@ -1381,8 +1379,7 @@ public class AsyncKuduClient implements AutoCloseable { } final KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest(); - final ServerInfo info = - tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection(), location); + final ServerInfo info = tablet.getTabletServerByUuid(scanner.getTsUUID()); if (info == null) { return Deferred.fromResult(null); } @@ -1436,7 +1433,7 @@ public class AsyncKuduClient implements AutoCloseable { if (entry != null) { RemoteTablet tablet = entry.getTablet(); ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection(), - location); + getLocationString()); if (info != null) { Deferred<R> d = request.getDeferred(); request.setTablet(tablet); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index a688224ab..fd3930e33 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -287,6 +287,12 @@ public final class AsyncKuduScanner { private String queryId; + /** + * UUID of the tserver which the scanner is bound with. The following scans of + * this scanner will be sent to the tserver. + */ + private String tsUUID; + /** * The prefetching result is cached in memory. This atomic reference is used to avoid * two concurrent prefetchings occur and the latest one overrides the previous one. @@ -565,6 +571,10 @@ public final class AsyncKuduScanner { this.rowDataFormat = rowDataFormat; } + public String getTsUUID() { + return tsUUID; + } + /** * Scans a number of rows. * <p> @@ -623,12 +633,14 @@ public final class AsyncKuduScanner { } if (!resp.more || resp.scannerId == null) { + tsUUID = resp.data.getTsUUID(); scanFinished(); return Deferred.fromResult(resp.data); // there might be data to return } scannerId = resp.scannerId; sequenceId++; canRequestMore = resp.more; + tsUUID = resp.data.getTsUUID(); if (LOG.isDebugEnabled()) { LOG.debug("Scanner {} opened on {}", Bytes.pretty(scannerId), tablet); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java index 753093729..2c035df74 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java @@ -302,6 +302,17 @@ public class RemoteTablet implements Comparable<RemoteTablet> { return results; } + /** + * Get information on tablet server by its UUID. + * @param uuid tablet server uuid. + * @return tablet server info by the uuid. + */ + ServerInfo getTabletServerByUuid(String uuid) { + synchronized (tabletServers) { + return tabletServers.get(uuid); + } + } + @Override public int compareTo(RemoteTablet remoteTablet) { if (remoteTablet == null) { diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java index 8ea7a1e08..9b4e86659 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java @@ -17,10 +17,13 @@ package org.apache.kudu.client; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.kudu.client.AsyncKuduScanner.DEFAULT_IS_DELETED_COL_NAME; +import static org.apache.kudu.test.ClientTestUtil.createManyStringsSchema; import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions; import static org.apache.kudu.test.ClientTestUtil.getBasicSchema; import static org.apache.kudu.test.ClientTestUtil.loadDefaultTable; +import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -38,6 +41,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Rule; @@ -54,6 +58,7 @@ import org.apache.kudu.test.CapturingLogAppender; import org.apache.kudu.test.KuduTestHarness; import org.apache.kudu.test.RandomUtils; import org.apache.kudu.test.cluster.KuduBinaryLocator; +import org.apache.kudu.test.junit.AssertHelpers; import org.apache.kudu.util.DataGenerator; import org.apache.kudu.util.Pair; @@ -600,4 +605,88 @@ public class TestKuduScanner { assertTrue(row.hasIsDeleted()); assertTrue(row.isDeleted()); } + + @Test + public void testScannerLeaderChanged() throws Exception { + // Prepare the table for testing. + Schema schema = createManyStringsSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + final int buckets = 2; + createOptions.addHashPartitions(ImmutableList.of("key"), buckets); + createOptions.setNumReplicas(3); + client.createTable(tableName, schema, createOptions); + + KuduSession session = client.newSession(); + KuduTable table = client.openTable(tableName); + final int totalRows = 2000; + for (int i = 0; i < totalRows; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", String.format("key_%02d", i)); + row.addString("c1", "c1_" + i); + row.addString("c2", "c2_" + i); + assertEquals(session.apply(insert).hasRowError(), false); + } + AsyncKuduClient asyncClient = harness.getAsyncClient(); + KuduScanner kuduScanner = new KuduScanner.KuduScannerBuilder(asyncClient, table) + .replicaSelection(ReplicaSelection.LEADER_ONLY) + .batchSizeBytes(100) + .build(); + + // Open the scanner first. + kuduScanner.nextRows(); + final HostAndPort referenceServerHostPort = harness.findLeaderTabletServer( + new LocatedTablet(kuduScanner.currentTablet())); + final String referenceTabletId = kuduScanner.currentTablet().getTabletId(); + + // Send LeaderStepDown request. + KuduBinaryLocator.ExecutableInfo exeInfo = KuduBinaryLocator.findBinary("kudu"); + LOG.info(harness.getMasterAddressesAsString()); + List<String> commandLine = Lists.newArrayList(exeInfo.exePath(), + "tablet", + "leader_step_down", + harness.findLeaderMasterServer().toString(), + kuduScanner.currentTablet().getTabletId()); + ProcessBuilder processBuilder = new ProcessBuilder(commandLine); + processBuilder.environment().putAll(exeInfo.environment()); + Process stepDownProcess = processBuilder.start(); + assertEquals(0, stepDownProcess.waitFor()); + + // Wait until the leader changes. + assertEventuallyTrue( + "The leadership should be transferred", + new AssertHelpers.BooleanExpression() { + @Override + public boolean get() throws Exception { + asyncClient.emptyTabletsCacheForTable(table.getTableId()); + List<LocatedTablet> tablets = table.getTabletsLocations(50000); + LocatedTablet targetTablet = null; + for (LocatedTablet tablet : tablets) { + String tabletId = new String(tablet.getTabletId(), UTF_8); + if (tabletId.equals(referenceTabletId)) { + targetTablet = tablet; + } + } + HostAndPort targetHp = harness.findLeaderTabletServer(targetTablet); + return !targetHp.equals(referenceServerHostPort); + } + }, + 10000/*timeoutMillis*/); + + // Simulate that another request(like Batch) has sent to the wrong leader tablet server and + // the change of leadership has been acknowledged. The response will demote the leader. + kuduScanner.currentTablet().demoteLeader( + kuduScanner.currentTablet().getLeaderServerInfo().getUuid()); + asyncClient.emptyTabletsCacheForTable(table.getTableId()); + + int rowsScannedInNextScans = 0; + try { + while (kuduScanner.hasMoreRows()) { + rowsScannedInNextScans += kuduScanner.nextRows().numRows; + } + } catch (Exception ex) { + assertFalse(ex.getMessage().matches(".*Scanner .* not found.*")); + } + assertTrue(rowsScannedInNextScans > 0); + } }