This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new efc9311 Use concurrent map instead of cache for bulkImported map (#1356) efc9311 is described below commit efc931102180562d1f17f9493bb7d4cd55f3f4a1 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Sep 11 13:08:27 2019 -0400 Use concurrent map instead of cache for bulkImported map (#1356) While analyzing some tablet code I noticed a Guava cahce was being used like a map. When I first saw the code I was concerned the cache may evict entries (which would be a bug). However the cache was created with default settings, which according to the javadoc does not automatically evict. This commit replaces the cache with a concurrent map so no one has to do this analysis again. --- .../tserver/tablet/BulkImportCacheCleaner.java | 2 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 34 ++++++++++++---------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java index c908840..03f33e2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java @@ -40,7 +40,7 @@ public class BulkImportCacheCleaner implements Runnable { // gather the list of transactions the tablets have cached final Set<Long> tids = new HashSet<>(); for (Tablet tablet : server.getOnlineTablets().values()) { - tids.addAll(tablet.getBulkIngestedFiles().keySet()); + tids.addAll(tablet.getBulkIngestedTxIds()); } try { // get the current transactions from ZooKeeper diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index bbc70a4..1e89624 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -38,7 +38,7 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -152,7 +152,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -254,8 +253,13 @@ public class Tablet { // tablet lock. private final Set<FileRef> bulkImporting = new HashSet<>(); - // Files that were successfully bulk imported. - private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build(); + // Files that were successfully bulk imported. Using a concurrent map supports non-locking + // operations on the key set which is useful for the periodic task that cleans up completed bulk + // imports for all tablets. However the values of this map are ArrayList which do not support + // concurrency. This is ok because all operations on the values are done while the tablet lock is + // held. + private final ConcurrentHashMap<Long,List<FileRef>> bulkImported = + new ConcurrentHashMap<Long,List<FileRef>>(); private final int logId; @@ -344,7 +348,7 @@ public class Tablet { this.location = locationPath; this.tabletDirectory = tabletPaths.dir; for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) { - this.bulkImported.put(entry.getKey(), new CopyOnWriteArrayList<>(entry.getValue())); + this.bulkImported.put(entry.getKey(), new ArrayList<>(entry.getValue())); } final List<LogEntry> logEntries = tabletPaths.logEntries; @@ -2246,17 +2250,17 @@ public class Tablet { MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, getTabletServer().getContext(), getTabletServer().getLock()); MasterMetadataUtil.addNewTablet(getTabletServer().getContext(), low, lowDirectory, - getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(), time, - lastFlushID, lastCompactID, getTabletServer().getLock()); + getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time, lastFlushID, + lastCompactID, getTabletServer().getLock()); MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, getTabletServer().getContext(), getTabletServer().getLock()); log.debug("TABLET_HIST {} split {} {}", extent, low, high); newTablets.put(high, new TabletData(tabletDirectory, highDatafileSizes, time, lastFlushID, - lastCompactID, lastLocation, getBulkIngestedFiles())); + lastCompactID, lastLocation, bulkImported)); newTablets.put(low, new TabletData(lowDirectory, lowDatafileSizes, time, lastFlushID, - lastCompactID, lastLocation, getBulkIngestedFiles())); + lastCompactID, lastLocation, bulkImported)); long t2 = System.currentTimeMillis(); @@ -2338,7 +2342,7 @@ public class Tablet { "Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet lock for " + extent); } - List<FileRef> alreadyImported = bulkImported.getIfPresent(tid); + List<FileRef> alreadyImported = bulkImported.get(tid); if (alreadyImported != null) { for (FileRef entry : alreadyImported) { if (fileMap.remove(entry) != null) { @@ -2385,7 +2389,7 @@ public class Tablet { } try { - bulkImported.get(tid, ArrayList::new).addAll(fileMap.keySet()); + bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet()); } catch (Exception ex) { log.info(ex.toString(), ex); } @@ -2848,14 +2852,12 @@ public class Tablet { } } - public Map<Long,List<FileRef>> getBulkIngestedFiles() { - return new HashMap<>(bulkImported.asMap()); + public Set<Long> getBulkIngestedTxIds() { + return bulkImported.keySet(); } public void cleanupBulkLoadedFiles(Set<Long> tids) { - for (Long tid : tids) { - bulkImported.invalidate(tid); - } + bulkImported.keySet().removeAll(tids); } }