This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new b3d32efc87 Resolves server to use for scan in a single place (#3272) b3d32efc87 is described below commit b3d32efc873b6511ab49e15287e99f3218c9954b Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Apr 5 16:45:42 2023 -0400 Resolves server to use for scan in a single place (#3272) The code for resolving which tserver or sserver to use for a scan was spread out across multiple methods responsible for executing a scan. Pulled the code to resolve which server to use into a single place in the code that executes a scan. Also introduced a new class to represent the server and server type (sserver or tserver) used to process a scan. These changes clean up two problems in the code. First the tablet server location class was being used to represent a scan server with a special string placed in the tserver session field. Second the decision to use a scan server was deeper in the scan code than error reporting code and the resulted in the need for an odd instance variable to remember that a scan server was used for error reporting. Removing these two problems makes the code easier to modify and maintain. --- .../accumulo/core/clientImpl/ThriftScanner.java | 419 ++++++++++++--------- 1 file changed, 243 insertions(+), 176 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 663b3748c1..acb2e9b2bd 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -30,6 +30,7 @@ import java.util.EnumMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; @@ -80,6 +81,7 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; import io.opentelemetry.api.trace.Span; @@ -171,6 +173,44 @@ public class ThriftScanner { throw new AccumuloException("getBatchFromServer: failed"); } + enum ServerType { + TSERVER, SSERVER + } + + static class ScanAddress { + final String serverAddress; + final ServerType serverType; + final TabletLocation tabletInfo; + + public ScanAddress(String serverAddress, ServerType serverType, TabletLocation tabletInfo) { + this.serverAddress = Objects.requireNonNull(serverAddress); + this.serverType = Objects.requireNonNull(serverType); + this.tabletInfo = Objects.requireNonNull(tabletInfo); + } + + public KeyExtent getExtent() { + return tabletInfo.getExtent(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScanAddress that = (ScanAddress) o; + return serverAddress.equals(that.serverAddress) && serverType == that.serverType + && getExtent().equals(that.getExtent()); + } + + @Override + public int hashCode() { + return Objects.hash(serverAddress, serverType, tabletInfo); + } + } + public static class ScanState { boolean isolated; @@ -189,7 +229,7 @@ public class ThriftScanner { Authorizations authorizations; List<Column> columns; - TabletLocation prevLoc; + ScanAddress prevLoc; Long scanID; String classLoaderContext; @@ -207,10 +247,6 @@ public class ThriftScanner { Duration busyTimeout; - TabletLocation getErrorLocation() { - return prevLoc; - } - public ScanState(ClientContext context, TableId tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size, List<IterInfo> serverSideIteratorList, @@ -280,10 +316,169 @@ public class ThriftScanner { return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() / 5)); } + private static ScanAddress getScanServerAddress(ClientContext context, ScanState scanState, + TabletLocation loc) { + Preconditions.checkArgument(scanState.runOnScanServer); + + ScanAddress addr = null; + + if (scanState.scanID != null && scanState.prevLoc != null + && scanState.prevLoc.serverType == ServerType.SSERVER + && scanState.prevLoc.getExtent().equals(loc.getExtent())) { + // this is the case of continuing a scan on a scan server for the same tablet, so lets not + // call the scan server selector and just go back to the previous scan server + addr = scanState.prevLoc; + log.trace( + "For tablet {} continuing scan on scan server {} without consulting scan server selector, using busyTimeout {}", + loc.getExtent(), addr.serverAddress, scanState.busyTimeout); + } else { + var tabletId = new TabletIdImpl(loc.getExtent()); + // obtain a snapshot once and only expose this snapshot to the plugin for consistency + var attempts = scanState.scanAttempts.snapshot(); + + var params = new ScanServerSelector.SelectorParameters() { + + @Override + public List<TabletId> getTablets() { + return List.of(tabletId); + } + + @Override + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { + return attempts.getOrDefault(tabletId, Set.of()); + } + + @Override + public Map<String,String> getHints() { + if (scanState.executionHints == null) { + return Map.of(); + } + return scanState.executionHints; + } + }; + + ScanServerSelections actions = context.getScanServerSelector().selectServers(params); + + Duration delay = null; + + String scanServer = actions.getScanServer(tabletId); + if (scanServer != null) { + addr = new ScanAddress(scanServer, ServerType.SSERVER, loc); + delay = actions.getDelay(); + scanState.busyTimeout = actions.getBusyTimeout(); + log.trace("For tablet {} scan server selector chose scan_server:{} delay:{} busyTimeout:{}", + loc.getExtent(), scanServer, delay, scanState.busyTimeout); + } else { + addr = new ScanAddress(loc.getTserverLocation(), ServerType.TSERVER, loc); + delay = actions.getDelay(); + scanState.busyTimeout = Duration.ZERO; + log.trace("For tablet {} scan server selector chose tablet_server", loc.getExtent()); + } + + if (!delay.isZero()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + return addr; + } + + static ScanAddress getNextScanAddress(ClientContext context, ScanState scanState, long timeOut, + long startTime, long maxSleepTime) throws TableNotFoundException, AccumuloSecurityException, + AccumuloServerException, InterruptedException, ScanTimedOutException { + + String lastError = null; + String error = null; + long sleepMillis = 100; + + ScanAddress addr = null; + + while (addr == null) { + long currentTime = System.currentTimeMillis(); + if ((currentTime - startTime) / 1000.0 > timeOut) { + throw new ScanTimedOutException(); + } + + TabletLocation loc = null; + + Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet"); + try (Scope locateSpan = child1.makeCurrent()) { + loc = TabletLocator.getLocator(context, scanState.tableId).locateTablet(context, + scanState.startRow, scanState.skipStartRow, false); + + if (loc == null) { + context.requireNotDeleted(scanState.tableId); + context.requireNotOffline(scanState.tableId, null); + + error = "Failed to locate tablet for table : " + scanState.tableId + " row : " + + scanState.startRow; + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } else { + // when a tablet splits we do want to continue scanning the low child + // of the split if we are already passed it + Range dataRange = loc.getExtent().toDataRange(); + + if (scanState.range.getStartKey() != null + && dataRange.afterEndKey(scanState.range.getStartKey())) { + // go to the next tablet + scanState.startRow = loc.getExtent().endRow(); + scanState.skipStartRow = true; + // force another lookup + loc = null; + } else if (scanState.range.getEndKey() != null + && dataRange.beforeStartKey(scanState.range.getEndKey())) { + // should not happen + throw new RuntimeException("Unexpected tablet, extent : " + loc.getExtent() + + " range : " + scanState.range + " startRow : " + scanState.startRow); + } + } + } catch (AccumuloServerException e) { + TraceUtil.setException(child1, e, true); + log.debug("Scan failed, server side exception : {}", e.getMessage()); + throw e; + } catch (AccumuloException e) { + error = "exception from tablet loc " + e.getMessage(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + + TraceUtil.setException(child1, e, false); + + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } finally { + child1.end(); + } + + if (loc != null) { + if (scanState.runOnScanServer) { + addr = getScanServerAddress(context, scanState, loc); + } else { + addr = new ScanAddress(loc.getTserverLocation(), ServerType.TSERVER, loc); + } + } + } + + return addr; + } + public static List<KeyValue> scan(ClientContext context, ScanState scanState, long timeOut) throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, TableNotFoundException { - TabletLocation loc = null; + long startTime = System.currentTimeMillis(); String lastError = null; String error = null; @@ -305,73 +500,12 @@ public class ThriftScanner { throw new ScanTimedOutException(); } - while (loc == null) { - long currentTime = System.currentTimeMillis(); - if ((currentTime - startTime) / 1000.0 > timeOut) { - throw new ScanTimedOutException(); - } - - Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet"); - try (Scope locateSpan = child1.makeCurrent()) { - loc = TabletLocator.getLocator(context, scanState.tableId).locateTablet(context, - scanState.startRow, scanState.skipStartRow, false); - - if (loc == null) { - context.requireNotDeleted(scanState.tableId); - context.requireNotOffline(scanState.tableId, null); - - error = "Failed to locate tablet for table : " + scanState.tableId + " row : " - + scanState.startRow; - if (!error.equals(lastError)) { - log.debug("{}", error); - } else if (log.isTraceEnabled()) { - log.trace("{}", error); - } - lastError = error; - sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); - } else { - // when a tablet splits we do want to continue scanning the low child - // of the split if we are already passed it - Range dataRange = loc.getExtent().toDataRange(); - - if (scanState.range.getStartKey() != null - && dataRange.afterEndKey(scanState.range.getStartKey())) { - // go to the next tablet - scanState.startRow = loc.getExtent().endRow(); - scanState.skipStartRow = true; - loc = null; - } else if (scanState.range.getEndKey() != null - && dataRange.beforeStartKey(scanState.range.getEndKey())) { - // should not happen - throw new RuntimeException("Unexpected tablet, extent : " + loc.getExtent() - + " range : " + scanState.range + " startRow : " + scanState.startRow); - } - } - } catch (AccumuloServerException e) { - TraceUtil.setException(child1, e, true); - log.debug("Scan failed, server side exception : {}", e.getMessage()); - throw e; - } catch (AccumuloException e) { - error = "exception from tablet loc " + e.getMessage(); - if (!error.equals(lastError)) { - log.debug("{}", error); - } else if (log.isTraceEnabled()) { - log.trace("{}", error); - } - - TraceUtil.setException(child1, e, false); - - lastError = error; - sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); - } finally { - child1.end(); - } - } + ScanAddress addr = getNextScanAddress(context, scanState, timeOut, startTime, maxSleepTime); Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location", - Map.of("tserver", loc.getTserverLocation())); + Map.of("tserver", addr.serverAddress)); try (Scope scanLocation = child2.makeCurrent()) { - results = scan(loc, scanState, context); + results = scan(addr, scanState, context); } catch (AccumuloSecurityException e) { context.clearTableListCache(); context.requireNotDeleted(scanState.tableId); @@ -380,14 +514,14 @@ public class ThriftScanner { throw e; } catch (TApplicationException tae) { TraceUtil.setException(child2, tae, true); - throw new AccumuloServerException(scanState.getErrorLocation().getTserverLocation(), tae); + throw new AccumuloServerException(addr.serverAddress, tae); } catch (TSampleNotPresentException tsnpe) { String message = "Table " + context.getPrintableTableInfoFromId(scanState.tableId) + " does not have sampling configured or built"; TraceUtil.setException(child2, tsnpe, true); throw new SampleNotPresentException(message, tsnpe); } catch (NotServingTabletException e) { - error = "Scan failed, not serving tablet " + scanState.getErrorLocation(); + error = "Scan failed, not serving tablet " + addr.serverAddress; if (!error.equals(lastError)) { log.debug("{}", error); } else if (log.isTraceEnabled()) { @@ -395,8 +529,7 @@ public class ThriftScanner { } lastError = error; - TabletLocator.getLocator(context, scanState.tableId).invalidateCache(loc.getExtent()); - loc = null; + TabletLocator.getLocator(context, scanState.tableId).invalidateCache(addr.getExtent()); // no need to try the current scan id somewhere else scanState.scanID = null; @@ -409,7 +542,7 @@ public class ThriftScanner { TraceUtil.setException(child2, e, false); sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); } catch (ScanServerBusyException e) { - error = "Scan failed, scan server was busy " + scanState.getErrorLocation(); + error = "Scan failed, scan server was busy " + addr.serverAddress; if (!error.equals(lastError)) { log.debug("{}", error); } else if (log.isTraceEnabled()) { @@ -425,8 +558,7 @@ public class ThriftScanner { TraceUtil.setException(child2, e, false); scanState.scanID = null; } catch (NoSuchScanIDException e) { - error = "Scan failed, no such scan id " + scanState.scanID + " " - + scanState.getErrorLocation(); + error = "Scan failed, no such scan id " + scanState.scanID + " " + addr.serverAddress; if (!error.equals(lastError)) { log.debug("{}", error); } else if (log.isTraceEnabled()) { @@ -442,7 +574,7 @@ public class ThriftScanner { TraceUtil.setException(child2, e, false); scanState.scanID = null; } catch (TooManyFilesException e) { - error = "Tablet has too many files " + scanState.getErrorLocation() + " retrying..."; + error = "Tablet has too many files " + addr.serverAddress + " retrying..."; if (error.equals(lastError)) { tooManyFilesCount++; if (tooManyFilesCount == 300) { @@ -469,17 +601,20 @@ public class ThriftScanner { TraceUtil.setException(child2, e, false); sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); } catch (TException e) { - TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context, - loc.getTserverLocation()); + if (addr.serverType == ServerType.TSERVER) { + // only tsever locations are in cache, invalidating a scan server would not find + // anything the cache + TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context, + addr.serverAddress); + } error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage() - + " " + scanState.getErrorLocation(); + + " " + addr.serverAddress; if (!error.equals(lastError)) { log.debug("{}", error); } else if (log.isTraceEnabled()) { log.trace("{}", error); } lastError = error; - loc = null; // do not want to continue using the same scan id, if a timeout occurred could cause a // batch to be skipped @@ -511,99 +646,33 @@ public class ThriftScanner { } } - private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, ClientContext context) + private static List<KeyValue> scan(ScanAddress addr, ScanState scanState, ClientContext context) throws AccumuloSecurityException, NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException { if (scanState.finished) { return null; } - if (scanState.runOnScanServer) { - - TabletLocation newLoc; - - var tabletId = new TabletIdImpl(loc.getExtent()); - - if (scanState.scanID != null && scanState.prevLoc != null - && scanState.prevLoc.getTserverSession().equals("scan_server") - && scanState.prevLoc.getExtent().equals(loc.getExtent())) { - // this is the case of continuing a scan on a scan server for the same tablet, so lets not - // call the scan server selector and just go back to the previous scan server - newLoc = scanState.prevLoc; - log.trace( - "For tablet {} continuing scan on scan server {} without consulting scan server selector, using busyTimeout {}", - loc.getExtent(), newLoc.getTserverLocation(), scanState.busyTimeout); - } else { - // obtain a snapshot once and only expose this snapshot to the plugin for consistency - var attempts = scanState.scanAttempts.snapshot(); - - var params = new ScanServerSelector.SelectorParameters() { - - @Override - public List<TabletId> getTablets() { - return List.of(tabletId); - } - - @Override - public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { - return attempts.getOrDefault(tabletId, Set.of()); - } - - @Override - public Map<String,String> getHints() { - if (scanState.executionHints == null) { - return Map.of(); - } - return scanState.executionHints; - } - }; - - ScanServerSelections actions = context.getScanServerSelector().selectServers(params); - - Duration delay = null; - - String scanServer = actions.getScanServer(tabletId); - if (scanServer != null) { - newLoc = new TabletLocation(loc.getExtent(), scanServer, "scan_server"); - delay = actions.getDelay(); - scanState.busyTimeout = actions.getBusyTimeout(); - log.trace( - "For tablet {} scan server selector chose scan_server:{} delay:{} busyTimeout:{}", - loc.getExtent(), scanServer, delay, scanState.busyTimeout); - } else { - newLoc = loc; - delay = actions.getDelay(); - scanState.busyTimeout = Duration.ZERO; - log.trace("For tablet {} scan server selector chose tablet_server", loc.getExtent()); - } - - if (!delay.isZero()) { - try { - Thread.sleep(delay.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - } - - var reporter = scanState.scanAttempts.createReporter(newLoc.getTserverLocation(), tabletId); - + if (addr.serverType == ServerType.SSERVER) { try { - return scanRpc(newLoc, scanState, context, scanState.busyTimeout.toMillis()); + return scanRpc(addr, scanState, context, scanState.busyTimeout.toMillis()); } catch (ScanServerBusyException ssbe) { + var reporter = scanState.scanAttempts.createReporter(addr.serverAddress, + new TabletIdImpl(addr.getExtent())); reporter.report(ScanServerAttempt.Result.BUSY); throw ssbe; } catch (Exception e) { + var reporter = scanState.scanAttempts.createReporter(addr.serverAddress, + new TabletIdImpl(addr.getExtent())); reporter.report(ScanServerAttempt.Result.ERROR); throw e; } } else { - return scanRpc(loc, scanState, context, 0L); + return scanRpc(addr, scanState, context, 0L); } } - private static List<KeyValue> scanRpc(TabletLocation loc, ScanState scanState, + private static List<KeyValue> scanRpc(ScanAddress addr, ScanState scanState, ClientContext context, long busyTimeout) throws AccumuloSecurityException, NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException, ScanServerBusyException { @@ -612,7 +681,7 @@ public class ThriftScanner { final TInfo tinfo = TraceUtil.traceInfo(); - final HostAndPort parsedLocation = HostAndPort.fromString(loc.getTserverLocation()); + final HostAndPort parsedLocation = HostAndPort.fromString(addr.serverAddress); TabletScanClientService.Client client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedLocation, context); @@ -620,31 +689,29 @@ public class ThriftScanner { try { ScanResult sr; - if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc)) { + if (scanState.prevLoc != null && !scanState.prevLoc.equals(addr)) { scanState.scanID = null; } - scanState.prevLoc = loc; + scanState.prevLoc = addr; if (scanState.scanID == null) { - Thread.currentThread().setName("Starting scan tserver=" + loc.getTserverLocation() - + " tableId=" + loc.getExtent().tableId()); + Thread.currentThread().setName("Starting scan tserver=" + addr.serverAddress + " tableId=" + + addr.getExtent().tableId()); if (log.isTraceEnabled()) { - String msg = "Starting scan tserver=" + loc.getTserverLocation() + " tablet=" - + loc.getExtent() + " range=" + scanState.range + " ssil=" - + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions - + " context=" + scanState.classLoaderContext; + String msg = "Starting scan server=" + addr.serverAddress + " tablet=" + addr.getExtent() + + " range=" + scanState.range + " ssil=" + scanState.serverSideIteratorList + " ssio=" + + scanState.serverSideIteratorOptions + " context=" + scanState.classLoaderContext; log.trace("tid={} {}", Thread.currentThread().getId(), msg); timer = new OpTimer().start(); } - TabletType ttype = TabletType.type(loc.getExtent()); - boolean waitForWrites = - !serversWaitedForWrites.get(ttype).contains(loc.getTserverLocation()); + TabletType ttype = TabletType.type(addr.getExtent()); + boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(addr.serverAddress); InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(), - loc.getExtent().toThrift(), scanState.range.toThrift(), + addr.getExtent().toThrift(), scanState.range.toThrift(), scanState.columns.stream().map(Column::toThrift).collect(Collectors.toList()), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, @@ -652,7 +719,7 @@ public class ThriftScanner { SamplerConfigurationImpl.toThrift(scanState.samplerConfig), scanState.batchTimeOut, scanState.classLoaderContext, scanState.executionHints, busyTimeout); if (waitForWrites) { - serversWaitedForWrites.get(ttype).add(loc.getTserverLocation()); + serversWaitedForWrites.get(ttype).add(addr.serverAddress); } sr = is.result; @@ -666,7 +733,7 @@ public class ThriftScanner { } else { // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc); String msg = - "Continuing scan tserver=" + loc.getTserverLocation() + " scanid=" + scanState.scanID; + "Continuing scan tserver=" + addr.serverAddress + " scanid=" + scanState.scanID; Thread.currentThread().setName(msg); if (log.isTraceEnabled()) { @@ -691,7 +758,7 @@ public class ThriftScanner { } else { // log.debug("No more : tab end row = "+loc.tablet_extent.getEndRow()+" range = // "+scanState.range); - if (loc.getExtent().endRow() == null) { + if (addr.getExtent().endRow() == null) { scanState.finished = true; if (timer != null) { @@ -702,8 +769,8 @@ public class ThriftScanner { } } else if (scanState.range.getEndKey() == null || !scanState.range - .afterEndKey(new Key(loc.getExtent().endRow()).followingKey(PartialKey.ROW))) { - scanState.startRow = loc.getExtent().endRow(); + .afterEndKey(new Key(addr.getExtent().endRow()).followingKey(PartialKey.ROW))) { + scanState.startRow = addr.getExtent().endRow(); scanState.skipStartRow = true; if (timer != null) { @@ -750,7 +817,7 @@ public class ThriftScanner { TInfo tinfo = TraceUtil.traceInfo(); log.debug("Closing active scan {} {}", scanState.prevLoc, scanState.scanID); - HostAndPort parsedLocation = HostAndPort.fromString(scanState.prevLoc.getTserverLocation()); + HostAndPort parsedLocation = HostAndPort.fromString(scanState.prevLoc.serverAddress); TabletScanClientService.Client client = null; try { client =