This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 149b65ce24 Improves scan server resolution in batch scanner code. (#3273) 149b65ce24 is described below commit 149b65ce242901ed9a579a8d2b0badc53c24ae04 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Apr 5 16:46:59 2023 -0400 Improves scan server resolution in batch scanner code. (#3273) Refactors the batch scanner code to make two improvements. First mapping ranges to scan servers or tservers was moved under one method. Second the process of mapping ranges to scan servers using the tablet location cache was improved. The tablet location cache used to organized data in a a way that was useful for immediate scans and eventual scans had to reorganize the data. A new method was added to the tablet location cache that passes the raw data to a consumer that can orgranize it in any way. This new method is used for scan servers. --- .../core/clientImpl/RootTabletLocator.java | 8 +- .../core/clientImpl/SyncingTabletLocator.java | 8 ++ .../accumulo/core/clientImpl/TabletLocator.java | 23 +++- .../core/clientImpl/TabletLocatorImpl.java | 32 ++--- .../TabletServerBatchReaderIterator.java | 147 ++++++++++++++------- .../accumulo/server/client/BulkImporterTest.java | 5 +- 6 files changed, 149 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java index a24a236998..f29a27b591 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.TabletLocatorImpl.TabletServerLockChecker; @@ -67,14 +68,13 @@ public class RootTabletLocator extends TabletLocator { } @Override - public List<Range> binRanges(ClientContext context, List<Range> ranges, - Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { + public List<Range> locateTablets(ClientContext context, List<Range> ranges, + BiConsumer<TabletLocation,Range> rangeConsumer) { TabletLocation rootTabletLocation = getRootTabletLocation(context); if (rootTabletLocation != null) { for (Range range : ranges) { - TabletLocatorImpl.addRange(binnedRanges, rootTabletLocation.getTserverLocation(), - RootTable.EXTENT, range); + rangeConsumer.accept(rootTabletLocation, range); } return Collections.emptyList(); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java index df40a21ce1..dc38d18f8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.clientImpl; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Supplier; import org.apache.accumulo.core.client.AccumuloException; @@ -76,6 +77,13 @@ public class SyncingTabletLocator extends TabletLocator { syncLocator().binMutations(context, mutations, binnedMutations, failures); } + @Override + public List<Range> locateTablets(ClientContext context, List<Range> ranges, + BiConsumer<TabletLocation,Range> rangeConsumer) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + return syncLocator().locateTablets(context, ranges, rangeConsumer); + } + @Override public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java index fd30d620b9..a60e63d213 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -64,10 +65,28 @@ public abstract class TabletLocator { Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; - public abstract List<Range> binRanges(ClientContext context, List<Range> ranges, - Map<String,Map<KeyExtent,List<Range>>> binnedRanges) + /** + * This method finds what tablets overlap a given set of ranges, passing each range and its + * associated tablet to the range consumer. If a range overlaps multiple tablets then it can be + * passed to the range consumer multiple times. + */ + public abstract List<Range> locateTablets(ClientContext context, List<Range> ranges, + BiConsumer<TabletLocation,Range> rangeConsumer) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; + /** + * The behavior of this method is similar to + * {@link #locateTablets(ClientContext, List, BiConsumer)}, except it bins ranges to the passed in + * binnedRanges map instead of passing them to a consumer. + * + */ + public List<Range> binRanges(ClientContext context, List<Range> ranges, + Map<String,Map<KeyExtent,List<Range>>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + return locateTablets(context, ranges, + ((cachedTablet, range) -> TabletLocatorImpl.addRange(binnedRanges, cachedTablet, range))); + } + public abstract void invalidateCache(KeyExtent failedExtent); public abstract void invalidateCache(Collection<KeyExtent> keySet); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java index 69345af8cd..a0b4335ff6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java @@ -38,6 +38,7 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -293,8 +294,8 @@ public class TabletLocatorImpl extends TabletLocator { return true; } - private List<Range> binRanges(ClientContext context, List<Range> ranges, - Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache, + private List<Range> locateTablets(ClientContext context, List<Range> ranges, + BiConsumer<TabletLocation,Range> rangeConsumer, boolean useCache, LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { List<Range> failures = new ArrayList<>(); @@ -359,8 +360,7 @@ public class TabletLocatorImpl extends TabletLocator { // then after that merges and splits happen. if (isContiguous(tabletLocations)) { for (TabletLocation tl2 : tabletLocations) { - TabletLocatorImpl.addRange(binnedRanges, tl2.getTserverLocation(), tl2.getExtent(), - range); + rangeConsumer.accept(tl2, range); } } else { failures.add(range); @@ -375,8 +375,8 @@ public class TabletLocatorImpl extends TabletLocator { } @Override - public List<Range> binRanges(ClientContext context, List<Range> ranges, - Map<String,Map<KeyExtent,List<Range>>> binnedRanges) + public List<Range> locateTablets(ClientContext context, List<Range> ranges, + BiConsumer<TabletLocation,Range> rangeConsumer) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { /* @@ -404,7 +404,7 @@ public class TabletLocatorImpl extends TabletLocator { // sort ranges... therefore try binning ranges using only the cache // and sort whatever fails and retry - failures = binRanges(context, ranges, binnedRanges, true, lcSession); + failures = locateTablets(context, ranges, rangeConsumer, true, lcSession); } finally { rLock.unlock(); } @@ -416,7 +416,7 @@ public class TabletLocatorImpl extends TabletLocator { // try lookups again wLock.lock(); try { - failures = binRanges(context, failures, binnedRanges, false, lcSession); + failures = locateTablets(context, failures, rangeConsumer, false, lcSession); } finally { wLock.unlock(); } @@ -424,9 +424,8 @@ public class TabletLocatorImpl extends TabletLocator { if (timer != null) { timer.stop(); - log.trace("tid={} Binned {} ranges for table {} to {} tservers in {}", - Thread.currentThread().getId(), ranges.size(), tableId, binnedRanges.size(), - String.format("%.3f secs", timer.scale(SECONDS))); + log.trace("tid={} Binned {} ranges for table {} in {}", Thread.currentThread().getId(), + ranges.size(), tableId, String.format("%.3f secs", timer.scale(SECONDS))); } return failures; @@ -839,7 +838,8 @@ public class TabletLocatorImpl extends TabletLocator { Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); - parent.binRanges(context, lookups, binnedRanges); + parent.locateTablets(context, lookups, + (cachedTablet, range) -> addRange(binnedRanges, cachedTablet, range)); // randomize server order ArrayList<String> tabletServers = new ArrayList<>(binnedRanges.keySet()); @@ -861,10 +861,10 @@ public class TabletLocatorImpl extends TabletLocator { } } - protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, - String location, KeyExtent ke, Range range) { - binnedRanges.computeIfAbsent(location, k -> new HashMap<>()) - .computeIfAbsent(ke, k -> new ArrayList<>()).add(range); + static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, TabletLocation ct, + Range range) { + binnedRanges.computeIfAbsent(ct.getTserverLocation(), k -> new HashMap<>()) + .computeIfAbsent(ct.getExtent(), k -> new ArrayList<>()).add(range); } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 4dbfbffab8..0c37ac125e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -34,6 +34,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -238,21 +239,36 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); - binRanges(locator, ranges, binnedRanges); + var ssd = binRanges(locator, ranges, binnedRanges); - doLookups(binnedRanges, receiver, columns); + doLookups(binnedRanges, receiver, columns, ssd); } - private void binRanges(TabletLocator tabletLocator, List<Range> ranges, + private ScanServerData binRanges(TabletLocator tabletLocator, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { int lastFailureSize = Integer.MAX_VALUE; + ScanServerData ssd; + + long startTime = System.currentTimeMillis(); + while (true) { binnedRanges.clear(); - List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges); + + List<Range> failures; + + if (options.getConsistencyLevel().equals(ConsistencyLevel.IMMEDIATE)) { + failures = tabletLocator.binRanges(context, ranges, binnedRanges); + ssd = new ScanServerData(); + } else if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) { + ssd = binRangesForScanServers(tabletLocator, ranges, binnedRanges); + failures = ssd.failures; + } else { + throw new IllegalStateException(); + } if (failures.isEmpty()) { break; @@ -275,6 +291,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value failures.size()); } + if (System.currentTimeMillis() - startTime > retryTimeout) { + // TODO exception used for timeout is inconsistent + throw new TimedOutException( + "Failed to find servers to process scans before timeout was exceeded."); + } + try { Thread.sleep(100); } catch (InterruptedException e) { @@ -302,6 +324,8 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value binnedRanges.clear(); binnedRanges.putAll(binnedRanges2); + + return ssd; } private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, @@ -337,9 +361,10 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value // since the first call to binRanges clipped the ranges to within a tablet, we should not get // only // bin to the set of failed tablets - binRanges(locator, allRanges, binnedRanges); - doLookups(binnedRanges, receiver, columns); + var ssd = binRanges(locator, allRanges, binnedRanges); + + doLookups(binnedRanges, receiver, columns, ssd); } private String getTableInfo() { @@ -494,21 +519,11 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, - final ResultReceiver receiver, List<Column> columns) { + final ResultReceiver receiver, List<Column> columns, ScanServerData ssd) { int maxTabletsPerRequest = Integer.MAX_VALUE; - long busyTimeout = 0; - Duration scanServerSelectorDelay = null; - Map<String,ScanServerAttemptReporter> reporters = Map.of(); - - if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) { - var scanServerData = rebinToScanServers(binnedRanges); - busyTimeout = scanServerData.actions.getBusyTimeout().toMillis(); - reporters = scanServerData.reporters; - scanServerSelectorDelay = scanServerData.actions.getDelay(); - binnedRanges = scanServerData.binnedRanges; - } else { + if (options.getConsistencyLevel().equals(ConsistencyLevel.IMMEDIATE)) { // when there are lots of threads and a few tablet servers // it is good to break request to tablet servers up, the // following code determines if this is the case @@ -558,16 +573,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation); if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) { QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns, - busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay); + ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> {}), ssd.getDelay()); queryTasks.add(queryTask); } else { HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>(); for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) { tabletSubset.put(entry.getKey(), entry.getValue()); if (tabletSubset.size() >= maxTabletsPerRequest) { - QueryTask queryTask = - new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout, - reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay); + QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, + columns, ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> {}), + ssd.getDelay()); queryTasks.add(queryTask); tabletSubset = new HashMap<>(); } @@ -575,7 +590,8 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value if (!tabletSubset.isEmpty()) { QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, - busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay); + ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> {}), + ssd.getDelay()); queryTasks.add(queryTask); } } @@ -591,17 +607,59 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } private static class ScanServerData { - Map<String,Map<KeyExtent,List<Range>>> binnedRanges; - ScanServerSelections actions; - Map<String,ScanServerAttemptReporter> reporters; + final List<Range> failures; + final ScanServerSelections actions; + final Map<String,ScanServerAttemptReporter> reporters; + + public ScanServerData(List<Range> failures) { + this.failures = failures; + this.actions = null; + this.reporters = Map.of(); + } + + public ScanServerData(ScanServerSelections actions, + Map<String,ScanServerAttemptReporter> reporters) { + this.actions = actions; + this.reporters = reporters; + this.failures = List.of(); + } + + public ScanServerData() { + this.failures = List.of(); + this.actions = null; + this.reporters = Map.of(); + } + + public long getBusyTimeout() { + return actions == null ? 0L : actions.getBusyTimeout().toMillis(); + } + + public Duration getDelay() { + return actions == null ? null : actions.getDelay(); + } } - private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { + private ScanServerData binRangesForScanServers(TabletLocator tabletLocator, List<Range> ranges, + Map<String,Map<KeyExtent,List<Range>>> binnedRanges) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + ScanServerSelector ecsm = context.getScanServerSelector(); - List<TabletIdImpl> tabletIds = - binnedRanges.values().stream().flatMap(extentMap -> extentMap.keySet().stream()) - .map(TabletIdImpl::new).collect(Collectors.toList()); + Map<KeyExtent,String> extentToTserverMap = new HashMap<>(); + Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>(); + + Set<TabletIdImpl> tabletIds = new HashSet<>(); + + List<Range> failures = tabletLocator.locateTablets(context, ranges, (cachedTablet, range) -> { + extentToTserverMap.put(cachedTablet.getExtent(), cachedTablet.getTserverLocation()); + extentToRangesMap.computeIfAbsent(cachedTablet.getExtent(), k -> new ArrayList<>()) + .add(range); + tabletIds.add(new TabletIdImpl(cachedTablet.getExtent())); + }); + + if (!failures.isEmpty()) { + return new ScanServerData(failures); + } // get a snapshot of this once,not each time the plugin request it var scanAttemptsSnapshot = scanAttempts.snapshot(); @@ -625,18 +683,6 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value var actions = ecsm.selectServers(params); - Map<KeyExtent,String> extentToTserverMap = new HashMap<>(); - Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>(); - - binnedRanges.forEach((server, extentMap) -> { - extentMap.forEach((extent, ranges) -> { - extentToTserverMap.put(extent, server); - extentToRangesMap.put(extent, ranges); - }); - }); - - Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>(); - Map<String,ScanServerAttemptReporter> reporters = new HashMap<>(); for (TabletIdImpl tabletId : tabletIds) { @@ -644,25 +690,26 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value String serverToUse = actions.getScanServer(tabletId); if (serverToUse == null) { // no scan server was given so use the tablet server - serverToUse = extentToTserverMap.get(extent); + serverToUse = Objects.requireNonNull(extentToTserverMap.get(extent)); log.trace("For tablet {} scan server selector chose tablet_server", tabletId); } else { log.trace("For tablet {} scan server selector chose scan_server:{}", tabletId, serverToUse); } - var rangeMap = binnedRanges2.computeIfAbsent(serverToUse, k -> new HashMap<>()); - List<Range> ranges = extentToRangesMap.get(extent); - rangeMap.put(extent, ranges); + var rangeMap = binnedRanges.computeIfAbsent(serverToUse, k -> new HashMap<>()); + List<Range> extentRanges = extentToRangesMap.get(extent); + rangeMap.put(extent, extentRanges); var server = serverToUse; reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server, tabletId)); } - ScanServerData ssd = new ScanServerData(); + if (!failures.isEmpty()) { + return new ScanServerData(failures); + } + + ScanServerData ssd = new ScanServerData(actions, reporters); - ssd.binnedRanges = binnedRanges2; - ssd.actions = actions; - ssd.reporters = reporters; log.trace("Scan server selector chose delay:{} busyTimeout:{}", actions.getDelay(), actions.getBusyTimeout()); return ssd; diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java index e37783e826..d72e8331cd 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.BiConsumer; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TabletLocator; @@ -83,8 +84,8 @@ public class BulkImporterTest { } @Override - public List<Range> binRanges(ClientContext context, List<Range> ranges, - Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { + public List<Range> locateTablets(ClientContext context, List<Range> ranges, + BiConsumer<TabletLocation,Range> rangeConsumer) { throw new UnsupportedOperationException(); }