This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new b3d32efc87 Resolves server to use for scan in a single place (#3272)
b3d32efc87 is described below

commit b3d32efc873b6511ab49e15287e99f3218c9954b
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Apr 5 16:45:42 2023 -0400

    Resolves server to use for scan in a single place (#3272)
    
    The code for resolving which tserver or sserver to use for a scan was
    spread out across multiple methods responsible for executing a scan.
    Pulled the code to resolve which server to use into a single place in
    the code that executes a scan.
    
    Also introduced a new class to represent the server and server type
    (sserver or tserver) used to process a scan.
    
    These changes clean up two problems in the code. First the tablet
    server location class was being used to represent a scan server
    with a special string placed in the tserver session field. Second
    the decision to use a scan server was deeper in the scan code than
    error reporting code and the resulted in the need for an odd
    instance variable to remember that a scan server was used for error
    reporting.  Removing these two problems makes the code easier to
    modify and maintain.
---
 .../accumulo/core/clientImpl/ThriftScanner.java    | 419 ++++++++++++---------
 1 file changed, 243 insertions(+), 176 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 663b3748c1..acb2e9b2bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -30,6 +30,7 @@ import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -80,6 +81,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
 import io.opentelemetry.api.trace.Span;
@@ -171,6 +173,44 @@ public class ThriftScanner {
     throw new AccumuloException("getBatchFromServer: failed");
   }
 
+  enum ServerType {
+    TSERVER, SSERVER
+  }
+
+  static class ScanAddress {
+    final String serverAddress;
+    final ServerType serverType;
+    final TabletLocation tabletInfo;
+
+    public ScanAddress(String serverAddress, ServerType serverType, 
TabletLocation tabletInfo) {
+      this.serverAddress = Objects.requireNonNull(serverAddress);
+      this.serverType = Objects.requireNonNull(serverType);
+      this.tabletInfo = Objects.requireNonNull(tabletInfo);
+    }
+
+    public KeyExtent getExtent() {
+      return tabletInfo.getExtent();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ScanAddress that = (ScanAddress) o;
+      return serverAddress.equals(that.serverAddress) && serverType == 
that.serverType
+          && getExtent().equals(that.getExtent());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(serverAddress, serverType, tabletInfo);
+    }
+  }
+
   public static class ScanState {
 
     boolean isolated;
@@ -189,7 +229,7 @@ public class ThriftScanner {
     Authorizations authorizations;
     List<Column> columns;
 
-    TabletLocation prevLoc;
+    ScanAddress prevLoc;
     Long scanID;
 
     String classLoaderContext;
@@ -207,10 +247,6 @@ public class ThriftScanner {
 
     Duration busyTimeout;
 
-    TabletLocation getErrorLocation() {
-      return prevLoc;
-    }
-
     public ScanState(ClientContext context, TableId tableId, Authorizations 
authorizations,
         Range range, SortedSet<Column> fetchedColumns, int size,
         List<IterInfo> serverSideIteratorList,
@@ -280,10 +316,169 @@ public class ThriftScanner {
     return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() 
/ 5));
   }
 
+  private static ScanAddress getScanServerAddress(ClientContext context, 
ScanState scanState,
+      TabletLocation loc) {
+    Preconditions.checkArgument(scanState.runOnScanServer);
+
+    ScanAddress addr = null;
+
+    if (scanState.scanID != null && scanState.prevLoc != null
+        && scanState.prevLoc.serverType == ServerType.SSERVER
+        && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
+      // this is the case of continuing a scan on a scan server for the same 
tablet, so lets not
+      // call the scan server selector and just go back to the previous scan 
server
+      addr = scanState.prevLoc;
+      log.trace(
+          "For tablet {} continuing scan on scan server {} without consulting 
scan server selector, using busyTimeout {}",
+          loc.getExtent(), addr.serverAddress, scanState.busyTimeout);
+    } else {
+      var tabletId = new TabletIdImpl(loc.getExtent());
+      // obtain a snapshot once and only expose this snapshot to the plugin 
for consistency
+      var attempts = scanState.scanAttempts.snapshot();
+
+      var params = new ScanServerSelector.SelectorParameters() {
+
+        @Override
+        public List<TabletId> getTablets() {
+          return List.of(tabletId);
+        }
+
+        @Override
+        public Collection<? extends ScanServerAttempt> getAttempts(TabletId 
tabletId) {
+          return attempts.getOrDefault(tabletId, Set.of());
+        }
+
+        @Override
+        public Map<String,String> getHints() {
+          if (scanState.executionHints == null) {
+            return Map.of();
+          }
+          return scanState.executionHints;
+        }
+      };
+
+      ScanServerSelections actions = 
context.getScanServerSelector().selectServers(params);
+
+      Duration delay = null;
+
+      String scanServer = actions.getScanServer(tabletId);
+      if (scanServer != null) {
+        addr = new ScanAddress(scanServer, ServerType.SSERVER, loc);
+        delay = actions.getDelay();
+        scanState.busyTimeout = actions.getBusyTimeout();
+        log.trace("For tablet {} scan server selector chose scan_server:{} 
delay:{} busyTimeout:{}",
+            loc.getExtent(), scanServer, delay, scanState.busyTimeout);
+      } else {
+        addr = new ScanAddress(loc.getTserverLocation(), ServerType.TSERVER, 
loc);
+        delay = actions.getDelay();
+        scanState.busyTimeout = Duration.ZERO;
+        log.trace("For tablet {} scan server selector chose tablet_server", 
loc.getExtent());
+      }
+
+      if (!delay.isZero()) {
+        try {
+          Thread.sleep(delay.toMillis());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    return addr;
+  }
+
+  static ScanAddress getNextScanAddress(ClientContext context, ScanState 
scanState, long timeOut,
+      long startTime, long maxSleepTime) throws TableNotFoundException, 
AccumuloSecurityException,
+      AccumuloServerException, InterruptedException, ScanTimedOutException {
+
+    String lastError = null;
+    String error = null;
+    long sleepMillis = 100;
+
+    ScanAddress addr = null;
+
+    while (addr == null) {
+      long currentTime = System.currentTimeMillis();
+      if ((currentTime - startTime) / 1000.0 > timeOut) {
+        throw new ScanTimedOutException();
+      }
+
+      TabletLocation loc = null;
+
+      Span child1 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::locateTablet");
+      try (Scope locateSpan = child1.makeCurrent()) {
+        loc = TabletLocator.getLocator(context, 
scanState.tableId).locateTablet(context,
+            scanState.startRow, scanState.skipStartRow, false);
+
+        if (loc == null) {
+          context.requireNotDeleted(scanState.tableId);
+          context.requireNotOffline(scanState.tableId, null);
+
+          error = "Failed to locate tablet for table : " + scanState.tableId + 
" row : "
+              + scanState.startRow;
+          if (!error.equals(lastError)) {
+            log.debug("{}", error);
+          } else if (log.isTraceEnabled()) {
+            log.trace("{}", error);
+          }
+          lastError = error;
+          sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
+        } else {
+          // when a tablet splits we do want to continue scanning the low child
+          // of the split if we are already passed it
+          Range dataRange = loc.getExtent().toDataRange();
+
+          if (scanState.range.getStartKey() != null
+              && dataRange.afterEndKey(scanState.range.getStartKey())) {
+            // go to the next tablet
+            scanState.startRow = loc.getExtent().endRow();
+            scanState.skipStartRow = true;
+            // force another lookup
+            loc = null;
+          } else if (scanState.range.getEndKey() != null
+              && dataRange.beforeStartKey(scanState.range.getEndKey())) {
+            // should not happen
+            throw new RuntimeException("Unexpected tablet, extent : " + 
loc.getExtent()
+                + "  range : " + scanState.range + " startRow : " + 
scanState.startRow);
+          }
+        }
+      } catch (AccumuloServerException e) {
+        TraceUtil.setException(child1, e, true);
+        log.debug("Scan failed, server side exception : {}", e.getMessage());
+        throw e;
+      } catch (AccumuloException e) {
+        error = "exception from tablet loc " + e.getMessage();
+        if (!error.equals(lastError)) {
+          log.debug("{}", error);
+        } else if (log.isTraceEnabled()) {
+          log.trace("{}", error);
+        }
+
+        TraceUtil.setException(child1, e, false);
+
+        lastError = error;
+        sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
+      } finally {
+        child1.end();
+      }
+
+      if (loc != null) {
+        if (scanState.runOnScanServer) {
+          addr = getScanServerAddress(context, scanState, loc);
+        } else {
+          addr = new ScanAddress(loc.getTserverLocation(), ServerType.TSERVER, 
loc);
+        }
+      }
+    }
+
+    return addr;
+  }
+
   public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, long timeOut)
       throws ScanTimedOutException, AccumuloException, 
AccumuloSecurityException,
       TableNotFoundException {
-    TabletLocation loc = null;
+
     long startTime = System.currentTimeMillis();
     String lastError = null;
     String error = null;
@@ -305,73 +500,12 @@ public class ThriftScanner {
           throw new ScanTimedOutException();
         }
 
-        while (loc == null) {
-          long currentTime = System.currentTimeMillis();
-          if ((currentTime - startTime) / 1000.0 > timeOut) {
-            throw new ScanTimedOutException();
-          }
-
-          Span child1 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::locateTablet");
-          try (Scope locateSpan = child1.makeCurrent()) {
-            loc = TabletLocator.getLocator(context, 
scanState.tableId).locateTablet(context,
-                scanState.startRow, scanState.skipStartRow, false);
-
-            if (loc == null) {
-              context.requireNotDeleted(scanState.tableId);
-              context.requireNotOffline(scanState.tableId, null);
-
-              error = "Failed to locate tablet for table : " + 
scanState.tableId + " row : "
-                  + scanState.startRow;
-              if (!error.equals(lastError)) {
-                log.debug("{}", error);
-              } else if (log.isTraceEnabled()) {
-                log.trace("{}", error);
-              }
-              lastError = error;
-              sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
-            } else {
-              // when a tablet splits we do want to continue scanning the low 
child
-              // of the split if we are already passed it
-              Range dataRange = loc.getExtent().toDataRange();
-
-              if (scanState.range.getStartKey() != null
-                  && dataRange.afterEndKey(scanState.range.getStartKey())) {
-                // go to the next tablet
-                scanState.startRow = loc.getExtent().endRow();
-                scanState.skipStartRow = true;
-                loc = null;
-              } else if (scanState.range.getEndKey() != null
-                  && dataRange.beforeStartKey(scanState.range.getEndKey())) {
-                // should not happen
-                throw new RuntimeException("Unexpected tablet, extent : " + 
loc.getExtent()
-                    + "  range : " + scanState.range + " startRow : " + 
scanState.startRow);
-              }
-            }
-          } catch (AccumuloServerException e) {
-            TraceUtil.setException(child1, e, true);
-            log.debug("Scan failed, server side exception : {}", 
e.getMessage());
-            throw e;
-          } catch (AccumuloException e) {
-            error = "exception from tablet loc " + e.getMessage();
-            if (!error.equals(lastError)) {
-              log.debug("{}", error);
-            } else if (log.isTraceEnabled()) {
-              log.trace("{}", error);
-            }
-
-            TraceUtil.setException(child1, e, false);
-
-            lastError = error;
-            sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
-          } finally {
-            child1.end();
-          }
-        }
+        ScanAddress addr = getNextScanAddress(context, scanState, timeOut, 
startTime, maxSleepTime);
 
         Span child2 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::location",
-            Map.of("tserver", loc.getTserverLocation()));
+            Map.of("tserver", addr.serverAddress));
         try (Scope scanLocation = child2.makeCurrent()) {
-          results = scan(loc, scanState, context);
+          results = scan(addr, scanState, context);
         } catch (AccumuloSecurityException e) {
           context.clearTableListCache();
           context.requireNotDeleted(scanState.tableId);
@@ -380,14 +514,14 @@ public class ThriftScanner {
           throw e;
         } catch (TApplicationException tae) {
           TraceUtil.setException(child2, tae, true);
-          throw new 
AccumuloServerException(scanState.getErrorLocation().getTserverLocation(), tae);
+          throw new AccumuloServerException(addr.serverAddress, tae);
         } catch (TSampleNotPresentException tsnpe) {
           String message = "Table " + 
context.getPrintableTableInfoFromId(scanState.tableId)
               + " does not have sampling configured or built";
           TraceUtil.setException(child2, tsnpe, true);
           throw new SampleNotPresentException(message, tsnpe);
         } catch (NotServingTabletException e) {
-          error = "Scan failed, not serving tablet " + 
scanState.getErrorLocation();
+          error = "Scan failed, not serving tablet " + addr.serverAddress;
           if (!error.equals(lastError)) {
             log.debug("{}", error);
           } else if (log.isTraceEnabled()) {
@@ -395,8 +529,7 @@ public class ThriftScanner {
           }
           lastError = error;
 
-          TabletLocator.getLocator(context, 
scanState.tableId).invalidateCache(loc.getExtent());
-          loc = null;
+          TabletLocator.getLocator(context, 
scanState.tableId).invalidateCache(addr.getExtent());
 
           // no need to try the current scan id somewhere else
           scanState.scanID = null;
@@ -409,7 +542,7 @@ public class ThriftScanner {
           TraceUtil.setException(child2, e, false);
           sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
         } catch (ScanServerBusyException e) {
-          error = "Scan failed, scan server was busy " + 
scanState.getErrorLocation();
+          error = "Scan failed, scan server was busy " + addr.serverAddress;
           if (!error.equals(lastError)) {
             log.debug("{}", error);
           } else if (log.isTraceEnabled()) {
@@ -425,8 +558,7 @@ public class ThriftScanner {
           TraceUtil.setException(child2, e, false);
           scanState.scanID = null;
         } catch (NoSuchScanIDException e) {
-          error = "Scan failed, no such scan id " + scanState.scanID + " "
-              + scanState.getErrorLocation();
+          error = "Scan failed, no such scan id " + scanState.scanID + " " + 
addr.serverAddress;
           if (!error.equals(lastError)) {
             log.debug("{}", error);
           } else if (log.isTraceEnabled()) {
@@ -442,7 +574,7 @@ public class ThriftScanner {
           TraceUtil.setException(child2, e, false);
           scanState.scanID = null;
         } catch (TooManyFilesException e) {
-          error = "Tablet has too many files " + scanState.getErrorLocation() 
+ " retrying...";
+          error = "Tablet has too many files " + addr.serverAddress + " 
retrying...";
           if (error.equals(lastError)) {
             tooManyFilesCount++;
             if (tooManyFilesCount == 300) {
@@ -469,17 +601,20 @@ public class ThriftScanner {
           TraceUtil.setException(child2, e, false);
           sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
         } catch (TException e) {
-          TabletLocator.getLocator(context, 
scanState.tableId).invalidateCache(context,
-              loc.getTserverLocation());
+          if (addr.serverType == ServerType.TSERVER) {
+            // only tsever locations are in cache, invalidating a scan server 
would not find
+            // anything the cache
+            TabletLocator.getLocator(context, 
scanState.tableId).invalidateCache(context,
+                addr.serverAddress);
+          }
           error = "Scan failed, thrift error " + e.getClass().getName() + "  " 
+ e.getMessage()
-              + " " + scanState.getErrorLocation();
+              + " " + addr.serverAddress;
           if (!error.equals(lastError)) {
             log.debug("{}", error);
           } else if (log.isTraceEnabled()) {
             log.trace("{}", error);
           }
           lastError = error;
-          loc = null;
 
           // do not want to continue using the same scan id, if a timeout 
occurred could cause a
           // batch to be skipped
@@ -511,99 +646,33 @@ public class ThriftScanner {
     }
   }
 
-  private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, 
ClientContext context)
+  private static List<KeyValue> scan(ScanAddress addr, ScanState scanState, 
ClientContext context)
       throws AccumuloSecurityException, NotServingTabletException, TException,
       NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException 
{
     if (scanState.finished) {
       return null;
     }
 
-    if (scanState.runOnScanServer) {
-
-      TabletLocation newLoc;
-
-      var tabletId = new TabletIdImpl(loc.getExtent());
-
-      if (scanState.scanID != null && scanState.prevLoc != null
-          && scanState.prevLoc.getTserverSession().equals("scan_server")
-          && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
-        // this is the case of continuing a scan on a scan server for the same 
tablet, so lets not
-        // call the scan server selector and just go back to the previous scan 
server
-        newLoc = scanState.prevLoc;
-        log.trace(
-            "For tablet {} continuing scan on scan server {} without 
consulting scan server selector, using busyTimeout {}",
-            loc.getExtent(), newLoc.getTserverLocation(), 
scanState.busyTimeout);
-      } else {
-        // obtain a snapshot once and only expose this snapshot to the plugin 
for consistency
-        var attempts = scanState.scanAttempts.snapshot();
-
-        var params = new ScanServerSelector.SelectorParameters() {
-
-          @Override
-          public List<TabletId> getTablets() {
-            return List.of(tabletId);
-          }
-
-          @Override
-          public Collection<? extends ScanServerAttempt> getAttempts(TabletId 
tabletId) {
-            return attempts.getOrDefault(tabletId, Set.of());
-          }
-
-          @Override
-          public Map<String,String> getHints() {
-            if (scanState.executionHints == null) {
-              return Map.of();
-            }
-            return scanState.executionHints;
-          }
-        };
-
-        ScanServerSelections actions = 
context.getScanServerSelector().selectServers(params);
-
-        Duration delay = null;
-
-        String scanServer = actions.getScanServer(tabletId);
-        if (scanServer != null) {
-          newLoc = new TabletLocation(loc.getExtent(), scanServer, 
"scan_server");
-          delay = actions.getDelay();
-          scanState.busyTimeout = actions.getBusyTimeout();
-          log.trace(
-              "For tablet {} scan server selector chose scan_server:{} 
delay:{} busyTimeout:{}",
-              loc.getExtent(), scanServer, delay, scanState.busyTimeout);
-        } else {
-          newLoc = loc;
-          delay = actions.getDelay();
-          scanState.busyTimeout = Duration.ZERO;
-          log.trace("For tablet {} scan server selector chose tablet_server", 
loc.getExtent());
-        }
-
-        if (!delay.isZero()) {
-          try {
-            Thread.sleep(delay.toMillis());
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-          }
-        }
-      }
-
-      var reporter = 
scanState.scanAttempts.createReporter(newLoc.getTserverLocation(), tabletId);
-
+    if (addr.serverType == ServerType.SSERVER) {
       try {
-        return scanRpc(newLoc, scanState, context, 
scanState.busyTimeout.toMillis());
+        return scanRpc(addr, scanState, context, 
scanState.busyTimeout.toMillis());
       } catch (ScanServerBusyException ssbe) {
+        var reporter = 
scanState.scanAttempts.createReporter(addr.serverAddress,
+            new TabletIdImpl(addr.getExtent()));
         reporter.report(ScanServerAttempt.Result.BUSY);
         throw ssbe;
       } catch (Exception e) {
+        var reporter = 
scanState.scanAttempts.createReporter(addr.serverAddress,
+            new TabletIdImpl(addr.getExtent()));
         reporter.report(ScanServerAttempt.Result.ERROR);
         throw e;
       }
     } else {
-      return scanRpc(loc, scanState, context, 0L);
+      return scanRpc(addr, scanState, context, 0L);
     }
   }
 
-  private static List<KeyValue> scanRpc(TabletLocation loc, ScanState 
scanState,
+  private static List<KeyValue> scanRpc(ScanAddress addr, ScanState scanState,
       ClientContext context, long busyTimeout) throws 
AccumuloSecurityException,
       NotServingTabletException, TException, NoSuchScanIDException, 
TooManyFilesException,
       TSampleNotPresentException, ScanServerBusyException {
@@ -612,7 +681,7 @@ public class ThriftScanner {
 
     final TInfo tinfo = TraceUtil.traceInfo();
 
-    final HostAndPort parsedLocation = 
HostAndPort.fromString(loc.getTserverLocation());
+    final HostAndPort parsedLocation = 
HostAndPort.fromString(addr.serverAddress);
     TabletScanClientService.Client client =
         ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedLocation, 
context);
 
@@ -620,31 +689,29 @@ public class ThriftScanner {
     try {
       ScanResult sr;
 
-      if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc)) {
+      if (scanState.prevLoc != null && !scanState.prevLoc.equals(addr)) {
         scanState.scanID = null;
       }
 
-      scanState.prevLoc = loc;
+      scanState.prevLoc = addr;
 
       if (scanState.scanID == null) {
-        Thread.currentThread().setName("Starting scan tserver=" + 
loc.getTserverLocation()
-            + " tableId=" + loc.getExtent().tableId());
+        Thread.currentThread().setName("Starting scan tserver=" + 
addr.serverAddress + " tableId="
+            + addr.getExtent().tableId());
 
         if (log.isTraceEnabled()) {
-          String msg = "Starting scan tserver=" + loc.getTserverLocation() + " 
tablet="
-              + loc.getExtent() + " range=" + scanState.range + " ssil="
-              + scanState.serverSideIteratorList + " ssio=" + 
scanState.serverSideIteratorOptions
-              + " context=" + scanState.classLoaderContext;
+          String msg = "Starting scan server=" + addr.serverAddress + " 
tablet=" + addr.getExtent()
+              + " range=" + scanState.range + " ssil=" + 
scanState.serverSideIteratorList + " ssio="
+              + scanState.serverSideIteratorOptions + " context=" + 
scanState.classLoaderContext;
           log.trace("tid={} {}", Thread.currentThread().getId(), msg);
           timer = new OpTimer().start();
         }
 
-        TabletType ttype = TabletType.type(loc.getExtent());
-        boolean waitForWrites =
-            
!serversWaitedForWrites.get(ttype).contains(loc.getTserverLocation());
+        TabletType ttype = TabletType.type(addr.getExtent());
+        boolean waitForWrites = 
!serversWaitedForWrites.get(ttype).contains(addr.serverAddress);
 
         InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(),
-            loc.getExtent().toThrift(), scanState.range.toThrift(),
+            addr.getExtent().toThrift(), scanState.range.toThrift(),
             
scanState.columns.stream().map(Column::toThrift).collect(Collectors.toList()),
             scanState.size, scanState.serverSideIteratorList, 
scanState.serverSideIteratorOptions,
             scanState.authorizations.getAuthorizationsBB(), waitForWrites, 
scanState.isolated,
@@ -652,7 +719,7 @@ public class ThriftScanner {
             SamplerConfigurationImpl.toThrift(scanState.samplerConfig), 
scanState.batchTimeOut,
             scanState.classLoaderContext, scanState.executionHints, 
busyTimeout);
         if (waitForWrites) {
-          serversWaitedForWrites.get(ttype).add(loc.getTserverLocation());
+          serversWaitedForWrites.get(ttype).add(addr.serverAddress);
         }
 
         sr = is.result;
@@ -666,7 +733,7 @@ public class ThriftScanner {
       } else {
         // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc);
         String msg =
-            "Continuing scan tserver=" + loc.getTserverLocation() + " scanid=" 
+ scanState.scanID;
+            "Continuing scan tserver=" + addr.serverAddress + " scanid=" + 
scanState.scanID;
         Thread.currentThread().setName(msg);
 
         if (log.isTraceEnabled()) {
@@ -691,7 +758,7 @@ public class ThriftScanner {
       } else {
         // log.debug("No more : tab end row = 
"+loc.tablet_extent.getEndRow()+" range =
         // "+scanState.range);
-        if (loc.getExtent().endRow() == null) {
+        if (addr.getExtent().endRow() == null) {
           scanState.finished = true;
 
           if (timer != null) {
@@ -702,8 +769,8 @@ public class ThriftScanner {
           }
 
         } else if (scanState.range.getEndKey() == null || !scanState.range
-            .afterEndKey(new 
Key(loc.getExtent().endRow()).followingKey(PartialKey.ROW))) {
-          scanState.startRow = loc.getExtent().endRow();
+            .afterEndKey(new 
Key(addr.getExtent().endRow()).followingKey(PartialKey.ROW))) {
+          scanState.startRow = addr.getExtent().endRow();
           scanState.skipStartRow = true;
 
           if (timer != null) {
@@ -750,7 +817,7 @@ public class ThriftScanner {
       TInfo tinfo = TraceUtil.traceInfo();
 
       log.debug("Closing active scan {} {}", scanState.prevLoc, 
scanState.scanID);
-      HostAndPort parsedLocation = 
HostAndPort.fromString(scanState.prevLoc.getTserverLocation());
+      HostAndPort parsedLocation = 
HostAndPort.fromString(scanState.prevLoc.serverAddress);
       TabletScanClientService.Client client = null;
       try {
         client =

Reply via email to