This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new efc9311  Use concurrent map instead of cache for bulkImported map 
(#1356)
efc9311 is described below

commit efc931102180562d1f17f9493bb7d4cd55f3f4a1
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Sep 11 13:08:27 2019 -0400

    Use concurrent map instead of cache for bulkImported map (#1356)
    
    While analyzing some tablet code I noticed a Guava cahce was being used
    like a map.  When I first saw the code I was concerned the cache may
    evict entries (which would be a bug).  However the cache was created
    with default settings, which according to the javadoc does not
    automatically evict.  This commit replaces the cache with a concurrent
    map so no one has to do this analysis again.
---
 .../tserver/tablet/BulkImportCacheCleaner.java     |  2 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 34 ++++++++++++----------
 2 files changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
index c908840..03f33e2 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
@@ -40,7 +40,7 @@ public class BulkImportCacheCleaner implements Runnable {
     // gather the list of transactions the tablets have cached
     final Set<Long> tids = new HashSet<>();
     for (Tablet tablet : server.getOnlineTablets().values()) {
-      tids.addAll(tablet.getBulkIngestedFiles().keySet());
+      tids.addAll(tablet.getBulkIngestedTxIds());
     }
     try {
       // get the current transactions from ZooKeeper
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index bbc70a4..1e89624 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -38,7 +38,7 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -152,7 +152,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSet;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -254,8 +253,13 @@ public class Tablet {
   // tablet lock.
   private final Set<FileRef> bulkImporting = new HashSet<>();
 
-  // Files that were successfully bulk imported.
-  private final Cache<Long,List<FileRef>> bulkImported = 
CacheBuilder.newBuilder().build();
+  // Files that were successfully bulk imported. Using a concurrent map 
supports non-locking
+  // operations on the key set which is useful for the periodic task that 
cleans up completed bulk
+  // imports for all tablets. However the values of this map are ArrayList 
which do not support
+  // concurrency. This is ok because all operations on the values are done 
while the tablet lock is
+  // held.
+  private final ConcurrentHashMap<Long,List<FileRef>> bulkImported =
+      new ConcurrentHashMap<Long,List<FileRef>>();
 
   private final int logId;
 
@@ -344,7 +348,7 @@ public class Tablet {
     this.location = locationPath;
     this.tabletDirectory = tabletPaths.dir;
     for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) {
-      this.bulkImported.put(entry.getKey(), new 
CopyOnWriteArrayList<>(entry.getValue()));
+      this.bulkImported.put(entry.getKey(), new ArrayList<>(entry.getValue()));
     }
 
     final List<LogEntry> logEntries = tabletPaths.logEntries;
@@ -2246,17 +2250,17 @@ public class Tablet {
       MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio,
           getTabletServer().getContext(), getTabletServer().getLock());
       MasterMetadataUtil.addNewTablet(getTabletServer().getContext(), low, 
lowDirectory,
-          getTabletServer().getTabletSession(), lowDatafileSizes, 
getBulkIngestedFiles(), time,
-          lastFlushID, lastCompactID, getTabletServer().getLock());
+          getTabletServer().getTabletSession(), lowDatafileSizes, 
bulkImported, time, lastFlushID,
+          lastCompactID, getTabletServer().getLock());
       MetadataTableUtil.finishSplit(high, highDatafileSizes, 
highDatafilesToRemove,
           getTabletServer().getContext(), getTabletServer().getLock());
 
       log.debug("TABLET_HIST {} split {} {}", extent, low, high);
 
       newTablets.put(high, new TabletData(tabletDirectory, highDatafileSizes, 
time, lastFlushID,
-          lastCompactID, lastLocation, getBulkIngestedFiles()));
+          lastCompactID, lastLocation, bulkImported));
       newTablets.put(low, new TabletData(lowDirectory, lowDatafileSizes, time, 
lastFlushID,
-          lastCompactID, lastLocation, getBulkIngestedFiles()));
+          lastCompactID, lastLocation, bulkImported));
 
       long t2 = System.currentTimeMillis();
 
@@ -2338,7 +2342,7 @@ public class Tablet {
             "Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet 
lock for " + extent);
       }
 
-      List<FileRef> alreadyImported = bulkImported.getIfPresent(tid);
+      List<FileRef> alreadyImported = bulkImported.get(tid);
       if (alreadyImported != null) {
         for (FileRef entry : alreadyImported) {
           if (fileMap.remove(entry) != null) {
@@ -2385,7 +2389,7 @@ public class Tablet {
         }
 
         try {
-          bulkImported.get(tid, ArrayList::new).addAll(fileMap.keySet());
+          bulkImported.computeIfAbsent(tid, k -> new 
ArrayList<>()).addAll(fileMap.keySet());
         } catch (Exception ex) {
           log.info(ex.toString(), ex);
         }
@@ -2848,14 +2852,12 @@ public class Tablet {
     }
   }
 
-  public Map<Long,List<FileRef>> getBulkIngestedFiles() {
-    return new HashMap<>(bulkImported.asMap());
+  public Set<Long> getBulkIngestedTxIds() {
+    return bulkImported.keySet();
   }
 
   public void cleanupBulkLoadedFiles(Set<Long> tids) {
-    for (Long tid : tids) {
-      bulkImported.invalidate(tid);
-    }
+    bulkImported.keySet().removeAll(tids);
   }
 
 }

Reply via email to