Repository: hbase
Updated Branches:
  refs/heads/master 938aef772 -> d7325185a


HBASE-17172 Optimize major mob compaction with _del files

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d7325185
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d7325185
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d7325185

Branch: refs/heads/master
Commit: d7325185ad2864ed8fc78bb362776648a75c951b
Parents: 938aef7
Author: Huaxiang Sun <h...@cloudera.com>
Authored: Fri Jan 6 09:25:49 2017 -0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Feb 17 14:22:31 2017 -0800

----------------------------------------------------------------------
 .../PartitionedMobCompactionRequest.java        | 158 +++++++++-
 .../compactions/PartitionedMobCompactor.java    | 258 ++++++++++++---
 .../TestPartitionedMobCompactor.java            | 315 +++++++++++++++++--
 3 files changed, 661 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d7325185/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
index 3335149..3292d99 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
@@ -24,8 +24,11 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -37,14 +40,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 @InterfaceAudience.Private
 public class PartitionedMobCompactionRequest extends MobCompactionRequest {
 
-  protected Collection<FileStatus> delFiles;
+  protected List<CompactionDelPartition> delPartitions;
   protected Collection<CompactionPartition> compactionPartitions;
 
   public PartitionedMobCompactionRequest(Collection<CompactionPartition> 
compactionPartitions,
-    Collection<FileStatus> delFiles) {
+    List<CompactionDelPartition> delPartitions) {
     this.selectionTime = EnvironmentEdgeManager.currentTime();
     this.compactionPartitions = compactionPartitions;
-    this.delFiles = delFiles;
+    this.delPartitions = delPartitions;
   }
 
   /**
@@ -59,8 +62,8 @@ public class PartitionedMobCompactionRequest extends 
MobCompactionRequest {
    * Gets the del files.
    * @return The del files.
    */
-  public Collection<FileStatus> getDelFiles() {
-    return this.delFiles;
+  public List<CompactionDelPartition> getDelPartitions() {
+    return this.delPartitions;
   }
 
   /**
@@ -72,6 +75,10 @@ public class PartitionedMobCompactionRequest extends 
MobCompactionRequest {
     private List<FileStatus> files = new ArrayList<FileStatus>();
     private CompactionPartitionId partitionId;
 
+    // The startKey and endKey of this partition, both are inclusive.
+    private byte[] startKey;
+    private byte[] endKey;
+
     public CompactionPartition(CompactionPartitionId partitionId) {
       this.partitionId = partitionId;
     }
@@ -91,6 +98,35 @@ public class PartitionedMobCompactionRequest extends 
MobCompactionRequest {
     public int getFileCount () {
       return files.size();
     }
+
+    public byte[] getStartKey() {
+      return startKey;
+    }
+
+    /**
+     * Set start key of this partition, only if the input startKey is less than
+     * the current start key.
+     */
+    public void setStartKey(final byte[] startKey)
+    {
+      if ((this.startKey == null) || (Bytes.compareTo(startKey, this.startKey) 
< 0)) {
+        this.startKey = startKey;
+      }
+    }
+
+    public byte[] getEndKey() {
+      return endKey;
+    }
+
+    /**
+     * Set end key of this partition, only if the input endKey is greater than
+     * the current end key.
+     */
+    public void setEndKey(final byte[] endKey) {
+      if ((this.endKey == null) || (Bytes.compareTo(endKey, this.endKey) > 0)) 
{
+        this.endKey = endKey;
+      }
+    }
   }
 
   /**
@@ -183,4 +219,116 @@ public class PartitionedMobCompactionRequest extends 
MobCompactionRequest {
       return new StringBuilder(startKey).append(date).toString();
     }
   }
+
+  /**
+   * The delete file partition in the mob compaction.
+   * The delete partition is defined as [startKey, endKey] pair.
+   * The mob delete files that have the same start key and end key belong to
+   * the same partition.
+   */
+  protected static class CompactionDelPartition {
+    private List<Path> delFiles = new ArrayList<Path>();
+    private List<StoreFile> storeFiles = new ArrayList<>();
+    private CompactionDelPartitionId id;
+
+    public CompactionDelPartition(CompactionDelPartitionId id) {
+      this.id = id;
+    }
+
+    public CompactionDelPartitionId getId() {
+      return this.id;
+    }
+
+    void addDelFile(FileStatus file) {
+      delFiles.add(file.getPath());
+    }
+    public void addStoreFile(final StoreFile file) {
+      storeFiles.add(file);
+    }
+
+    public List<StoreFile> getStoreFiles() {
+      return storeFiles;
+    }
+
+    List<Path> listDelFiles() {
+      return Collections.unmodifiableList(delFiles);
+    }
+
+    void addDelFileList(final Collection<Path> list) {
+      delFiles.addAll(list);
+    }
+
+    int getDelFileCount () {
+      return delFiles.size();
+    }
+
+    void cleanDelFiles() {
+      delFiles.clear();
+    }
+  }
+
+  /**
+   * The delete partition id that consists of start key and end key
+   */
+  public static class CompactionDelPartitionId implements 
Comparable<CompactionDelPartitionId> {
+    private byte[] startKey;
+    private byte[] endKey;
+
+    public CompactionDelPartitionId() {
+    }
+
+    public CompactionDelPartitionId(final byte[] startKey, final byte[] 
endKey) {
+      this.startKey = startKey;
+      this.endKey = endKey;
+    }
+
+    public byte[] getStartKey() {
+      return this.startKey;
+    }
+    public void setStartKey(final byte[] startKey) {
+      this.startKey = startKey;
+    }
+
+    public byte[] getEndKey() {
+      return this.endKey;
+    }
+    public void setEndKey(final byte[] endKey) {
+      this.endKey = endKey;
+    }
+
+    public int compareTo(CompactionDelPartitionId o) {
+      /*
+       * 1). Compare the start key, if the k1 < k2, then k1 is less
+       * 2). If start Key is same, check endKey, k1 < k2, k1 is less
+       *     If both are same, then they are equal.
+       */
+      int result = Bytes.compareTo(this.startKey, o.getStartKey());
+      if (result != 0) {
+        return result;
+      }
+
+      return Bytes.compareTo(this.endKey, o.getEndKey());
+    }
+
+    @Override
+    public int hashCode() {
+      int result = 17;
+      result = 31 * result + java.util.Arrays.hashCode(startKey);
+      result = 31 * result + java.util.Arrays.hashCode(endKey);
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof CompactionDelPartitionId)) {
+        return false;
+      }
+      CompactionDelPartitionId another = (CompactionDelPartitionId) obj;
+
+      return (this.compareTo(another) == 0);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7325185/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index a0823d7..b6eb640 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -24,16 +24,19 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -58,11 +61,15 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import 
org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
+import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
+import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId;
 import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
 import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -132,6 +139,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       return null;
     }
     LOG.info("is allFiles: " + allFiles);
+
     // find the files to compact.
     PartitionedMobCompactionRequest request = select(files, allFiles);
     // compact the files.
@@ -148,11 +156,14 @@ public class PartitionedMobCompactor extends MobCompactor 
{
    */
   protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
     boolean allFiles) throws IOException {
-    final Collection<FileStatus> allDelFiles = new ArrayList<>();
     final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new 
HashMap<>();
     final CompactionPartitionId id = new CompactionPartitionId();
+    final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> 
delFilesToCompact = new TreeMap<>();
+    final CompactionDelPartitionId delId = new CompactionDelPartitionId();
+    final ArrayList<CompactionDelPartition> allDelPartitions = new 
ArrayList<>();
     int selectedFileCount = 0;
     int irrelevantFileCount = 0;
+    int totalDelFiles = 0;
     MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy();
 
     Calendar calendar =  Calendar.getInstance();
@@ -167,6 +178,31 @@ public class PartitionedMobCompactor extends MobCompactor {
       firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, 
currentDate);
     }
 
+    // We check if there is any del files so the logic can be optimized for 
the following processing
+    // First step is to check if there is any delete files. If there is any 
delete files,
+    // For each Partition, it needs to read its startKey and endKey from files.
+    // If there is no delete file, there is no need to read startKey and 
endKey from files, this
+    // is an optimization.
+    boolean withDelFiles = false;
+    for (FileStatus file : candidates) {
+      if (!file.isFile()) {
+        continue;
+      }
+      // group the del files and small files.
+      FileStatus linkedFile = file;
+      if (HFileLink.isHFileLink(file.getPath())) {
+        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, 
file.getPath());
+        linkedFile = getLinkedFileStatus(link);
+        if (linkedFile == null) {
+          continue;
+        }
+      }
+      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
+        withDelFiles = true;
+        break;
+      }
+    }
+
     for (FileStatus file : candidates) {
       if (!file.isFile()) {
         irrelevantFileCount++;
@@ -183,13 +219,32 @@ public class PartitionedMobCompactor extends MobCompactor 
{
           continue;
         }
       }
-      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
-        allDelFiles.add(file);
+      if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) {
+        // File in the Del Partition List
+
+        // Get delId from the file
+        Reader reader = HFile.createReader(fs, linkedFile.getPath(), 
CacheConfig.DISABLED, conf);
+        try {
+          delId.setStartKey(reader.getFirstRowKey());
+          delId.setEndKey(reader.getLastRowKey());
+        } finally {
+          reader.close();
+        }
+        CompactionDelPartition delPartition = delFilesToCompact.get(delId);
+        if (delPartition == null) {
+          CompactionDelPartitionId newDelId =
+              new CompactionDelPartitionId(delId.getStartKey(), 
delId.getEndKey());
+          delPartition = new CompactionDelPartition(newDelId);
+          delFilesToCompact.put(newDelId, delPartition);
+        }
+        delPartition.addDelFile(file);
+        totalDelFiles ++;
       } else {
         String fileName = linkedFile.getPath().getName();
         String date = MobFileName.getDateFromName(fileName);
-        boolean skipCompaction = MobUtils.fillPartitionId(id, 
firstDayOfCurrentMonth,
-            firstDayOfCurrentWeek, date, policy, calendar, mergeableSize);
+        boolean skipCompaction = MobUtils
+            .fillPartitionId(id, firstDayOfCurrentMonth, 
firstDayOfCurrentWeek, date, policy,
+                calendar, mergeableSize);
         if (allFiles || (!skipCompaction && (linkedFile.getLen() < 
id.getThreshold()))) {
           // add all files if allFiles is true,
           // otherwise add the small files to the merge pool
@@ -209,37 +264,51 @@ public class PartitionedMobCompactor extends MobCompactor 
{
             compactionPartition.getPartitionId().updateLatestDate(date);
           }
 
+          if (withDelFiles) {
+            // get startKey and endKey from the file and update partition
+            // TODO: is it possible to skip read of most hfiles?
+            Reader reader = HFile.createReader(fs, linkedFile.getPath(), 
CacheConfig.DISABLED, conf);
+            try {
+              compactionPartition.setStartKey(reader.getFirstRowKey());
+              compactionPartition.setEndKey(reader.getLastRowKey());
+            } finally {
+              reader.close();
+            }
+          }
+
           selectedFileCount++;
         }
       }
     }
 
     /*
-     * If it is not a major mob compaction with del files, and the file number 
in Partition is 1,
-     * remove the partition from filesToCompact list to avoid re-compacting 
files which has been
-     * compacted with del files.
+     * Merge del files so there are only non-overlapped del file lists
      */
-    if (!allFiles && (allDelFiles.size() > 0)) {
-      Iterator<Map.Entry<CompactionPartitionId, CompactionPartition>> it =
-          filesToCompact.entrySet().iterator();
-
-      while(it.hasNext()) {
-        Map.Entry<CompactionPartitionId, CompactionPartition> entry = 
it.next();
-        if (entry.getValue().getFileCount() == 1) {
-          it.remove();
-          --selectedFileCount;
+    for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : 
delFilesToCompact.entrySet()) {
+      if (allDelPartitions.size() > 0) {
+        // check if the current key range overlaps the previous one
+        CompactionDelPartition prev = 
allDelPartitions.get(allDelPartitions.size() - 1);
+        if (Bytes.compareTo(prev.getId().getEndKey(), 
entry.getKey().getStartKey()) >= 0) {
+          // merge them together
+          prev.getId().setEndKey(entry.getValue().getId().getEndKey());
+          prev.addDelFileList(entry.getValue().listDelFiles());
+
+        } else {
+          allDelPartitions.add(entry.getValue());
         }
+      } else {
+        allDelPartitions.add(entry.getValue());
       }
     }
 
     PartitionedMobCompactionRequest request = new 
PartitionedMobCompactionRequest(
-      filesToCompact.values(), allDelFiles);
-    if (candidates.size() == (allDelFiles.size() + selectedFileCount + 
irrelevantFileCount)) {
+      filesToCompact.values(), allDelPartitions);
+    if (candidates.size() == (totalDelFiles + selectedFileCount + 
irrelevantFileCount)) {
       // all the files are selected
       request.setCompactionType(CompactionType.ALL_FILES);
     }
     LOG.info("The compaction type is " + request.getCompactionType() + ", the 
request has "
-      + allDelFiles.size() + " del files, " + selectedFileCount + " selected 
files, and "
+      + totalDelFiles + " del files, " + selectedFileCount + " selected files, 
and "
       + irrelevantFileCount + " irrelevant files");
     return request;
   }
@@ -257,51 +326,139 @@ public class PartitionedMobCompactor extends 
MobCompactor {
    */
   protected List<Path> performCompaction(PartitionedMobCompactionRequest 
request)
     throws IOException {
-    // merge the del files
-    List<Path> delFilePaths = new ArrayList<>();
-    for (FileStatus delFile : request.delFiles) {
-      delFilePaths.add(delFile.getPath());
+
+    // merge the del files, it is per del partition
+    for (CompactionDelPartition delPartition : request.getDelPartitions()) {
+      if (delPartition.getDelFileCount() <= 1) continue;
+      List<Path> newDelPaths = compactDelFiles(request, 
delPartition.listDelFiles());
+      delPartition.cleanDelFiles();
+      delPartition.addDelFileList(newDelPaths);
     }
-    List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
-    List<StoreFile> newDelFiles = new ArrayList<>();
+
     List<Path> paths = null;
+    int totalDelFileCount = 0;
     try {
-      for (Path newDelPath : newDelPaths) {
-        StoreFile sf = new StoreFile(fs, newDelPath, conf, 
compactionCacheConfig, BloomType.NONE);
-        // pre-create reader of a del file to avoid race condition when 
opening the reader in each
-        // partition.
-        sf.createReader();
-        newDelFiles.add(sf);
-      }
-      LOG.info("After merging, there are " + newDelFiles.size() + " del 
files");
+      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
+        for (Path newDelPath : delPartition.listDelFiles()) {
+          StoreFile sf = new StoreFile(fs, newDelPath, conf, 
compactionCacheConfig, BloomType.NONE);
+          // pre-create reader of a del file to avoid race condition when 
opening the reader in each
+          // partition.
+          sf.createReader();
+          delPartition.addStoreFile(sf);
+          totalDelFileCount++;
+        }
+      }
+      LOG.info("After merging, there are " + totalDelFileCount + " del files");
       // compact the mob files by partitions.
-      paths = compactMobFiles(request, newDelFiles);
+      paths = compactMobFiles(request);
       LOG.info("After compaction, there are " + paths.size() + " mob files");
     } finally {
-      closeStoreFileReaders(newDelFiles);
+      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
+        closeStoreFileReaders(delPartition.getStoreFiles());
+      }
     }
+
     // archive the del files if all the mob files are selected.
-    if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
+    if (request.type == CompactionType.ALL_FILES && 
!request.getDelPartitions().isEmpty()) {
       LOG.info(
-          "After a mob compaction with all files selected, archiving the del 
files " + newDelPaths);
-      try {
-        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, 
column.getName(), newDelFiles);
-      } catch (IOException e) {
-        LOG.error("Failed to archive the del files " + newDelPaths, e);
+          "After a mob compaction with all files selected, archiving the del 
files ");
+      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
+        LOG.info(delPartition.listDelFiles());
+        try {
+          MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, 
column.getName(), delPartition.getStoreFiles());
+        } catch (IOException e) {
+          LOG.error("Failed to archive the del files " + 
delPartition.getStoreFiles(), e);
+        }
       }
     }
     return paths;
   }
 
+  static class DelPartitionComparator implements 
Comparator<CompactionDelPartition> {
+    private boolean compareStartKey;
+
+    DelPartitionComparator(boolean compareStartKey) {
+      this.compareStartKey = compareStartKey;
+    }
+
+    public boolean getCompareStartKey() {
+      return this.compareStartKey;
+    }
+
+    public void setCompareStartKey(final boolean compareStartKey) {
+      this.compareStartKey = compareStartKey;
+    }
+
+    @Override
+    public int compare(CompactionDelPartition o1, CompactionDelPartition o2) {
+
+      if (compareStartKey) {
+        return Bytes.compareTo(o1.getId().getStartKey(), 
o2.getId().getStartKey());
+      } else {
+        return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  List<StoreFile> getListOfDelFilesForPartition(final CompactionPartition 
partition,
+      final List<CompactionDelPartition> delPartitions) {
+    // Binary search for startKey and endKey
+
+    List<StoreFile> result = new ArrayList<>();
+
+    DelPartitionComparator comparator = new DelPartitionComparator(false);
+    CompactionDelPartitionId id = new CompactionDelPartitionId(null, 
partition.getStartKey());
+    CompactionDelPartition target = new CompactionDelPartition(id);
+    int start = Collections.binarySearch(delPartitions, target, comparator);
+
+    // Get the start index for partition
+    if (start < 0) {
+      // Calculate the insert point
+      start = (start + 1) * (-1);
+      if (start == delPartitions.size()) {
+        // no overlap
+        return result;
+      } else {
+        // Check another case which has no overlap
+        if (Bytes.compareTo(partition.getEndKey(), 
delPartitions.get(start).getId().getStartKey()) < 0) {
+          return result;
+        }
+      }
+    }
+
+    // Search for end index for the partition
+    comparator.setCompareStartKey(true);
+    id.setStartKey(partition.getEndKey());
+    int end = Collections.binarySearch(delPartitions, target, comparator);
+
+    if (end < 0) {
+      end = (end + 1) * (-1);
+      if (end == 0) {
+        return result;
+      } else {
+        --end;
+        if (Bytes.compareTo(partition.getStartKey(), 
delPartitions.get(end).getId().getEndKey()) > 0) {
+          return result;
+        }
+      }
+    }
+
+    for (int i = start; i <= end; ++i) {
+        result.addAll(delPartitions.get(i).getStoreFiles());
+    }
+
+    return result;
+  }
+
   /**
    * Compacts the selected small mob files and all the del files.
    * @param request The compaction request.
-   * @param delFiles The del files.
    * @return The paths of new mob files after compactions.
    * @throws IOException if IO failure is encountered
    */
-  protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest 
request,
-    final List<StoreFile> delFiles) throws IOException {
+  protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest 
request)
+      throws IOException {
     Collection<CompactionPartition> partitions = request.compactionPartitions;
     if (partitions == null || partitions.isEmpty()) {
       LOG.info("No partitions of mob files");
@@ -310,10 +467,19 @@ public class PartitionedMobCompactor extends MobCompactor 
{
     List<Path> paths = new ArrayList<>();
     final Connection c = ConnectionFactory.createConnection(conf);
     final Table table = c.getTable(tableName);
+
     try {
       Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>();
       // compact the mob files by partitions in parallel.
       for (final CompactionPartition partition : partitions) {
+
+        // How to efficiently come up a list of delFiles for one partition?
+        // Search the delPartitions and collect all the delFiles for the 
partition
+        // One optimization can do is that if there is no del file, we do not 
need to
+        // come up with startKey/endKey.
+        List<StoreFile> delFiles = getListOfDelFilesForPartition(partition,
+            request.getDelPartitions());
+
         results.put(partition.getPartitionId(), pool.submit(new 
Callable<List<Path>>() {
           @Override
           public List<Path> call() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7325185/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index c34f558..3aaf0e4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import 
org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
+import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
 import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -68,12 +69,13 @@ public class TestPartitionedMobCompactor {
   private final static String family = "family";
   private final static String qf = "qf";
   private final long DAY_IN_MS = 1000 * 60 * 60 * 24;
+  private static byte[] KEYS = Bytes.toBytes("012");
   private HColumnDescriptor hcd = new HColumnDescriptor(family);
   private Configuration conf = TEST_UTIL.getConfiguration();
   private CacheConfig cacheConf = new CacheConfig(conf);
   private FileSystem fs;
   private List<FileStatus> mobFiles = new ArrayList<>();
-  private List<FileStatus> delFiles = new ArrayList<>();
+  private List<Path> delFiles = new ArrayList<>();
   private List<FileStatus> allFiles = new ArrayList<>();
   private Path basePath;
   private String mobSuffix;
@@ -106,6 +108,9 @@ public class TestPartitionedMobCompactor {
     basePath = new Path(new Path(mobTestDir, tableName), family);
     mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
     delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+    allFiles.clear();
+    mobFiles.clear();
+    delFiles.clear();
   }
 
   @Test
@@ -221,15 +226,6 @@ public class TestPartitionedMobCompactor {
   }
 
   @Test
-  public void testCompactionSelectToAvoidCompactOneFileWithDelete() throws 
Exception {
-    String tableName = "testCompactionSelectToAvoidCompactOneFileWithDelete";
-    // If there is only 1 file, it will not be compacted with _del files, so
-    // It wont be CompactionType.ALL_FILES in this case, and expected compact 
file count will be 0.
-    testCompactionAtMergeSize(tableName, 
MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
-        CompactionType.PART_FILES, false);
-  }
-
-  @Test
   public void testCompactionSelectWithPartFiles() throws Exception {
     String tableName = "testCompactionSelectWithPartFiles";
     testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, 
false);
@@ -383,6 +379,239 @@ public class TestPartitionedMobCompactor {
     }
   }
 
+  /**
+   * Create mulitple partition files
+   */
+  private void createMobFile(Path basePath) throws IOException {
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 
1024).build();
+    MobFileName mobFileName = null;
+    int ii = 0;
+    Date today = new Date();
+    for (byte k0 : KEYS) {
+      byte[] startRow = Bytes.toBytes(ii++);
+
+      mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), 
mobSuffix);
+
+      StoreFileWriter mobFileWriter =
+          new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFileContext(meta)
+              .withFilePath(new Path(basePath, 
mobFileName.getFileName())).build();
+
+      long now = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < 10; i++) {
+          byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i));
+          byte[] dummyData = new byte[5000];
+          new Random().nextBytes(dummyData);
+          mobFileWriter.append(
+              new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, 
Type.Put, dummyData));
+        }
+      } finally {
+        mobFileWriter.close();
+      }
+    }
+  }
+
+  /**
+   * Create mulitple partition delete files
+   */
+  private void createMobDelFile(Path basePath, int startKey) throws 
IOException {
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 
1024).build();
+    MobFileName mobFileName = null;
+    Date today = new Date();
+
+    byte[] startRow = Bytes.toBytes(startKey);
+
+    mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), 
delSuffix);
+
+    StoreFileWriter mobFileWriter =
+        new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
+            .withFilePath(new Path(basePath, 
mobFileName.getFileName())).build();
+
+    long now = System.currentTimeMillis();
+    try {
+      byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0));
+      byte[] dummyData = new byte[5000];
+      new Random().nextBytes(dummyData);
+      mobFileWriter.append(
+          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, 
Type.Delete, dummyData));
+      key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2));
+      mobFileWriter.append(
+          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, 
Type.Delete, dummyData));
+      key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4));
+      mobFileWriter.append(
+          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, 
Type.Delete, dummyData));
+
+    } finally {
+      mobFileWriter.close();
+    }
+  }
+
+  @Test
+  public void testCompactFilesWithoutDelFile() throws Exception {
+    String tableName = "testCompactFilesWithoutDelFile";
+    resetConf();
+    init(tableName);
+
+    createMobFile(basePath);
+
+    listFiles();
+
+    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
+        TableName.valueOf(tableName), hcd, pool) {
+      @Override
+      public List<Path> compact(List<FileStatus> files, boolean 
isForceAllFiles)
+          throws IOException {
+        if (files == null || files.isEmpty()) {
+          return null;
+        }
+
+        PartitionedMobCompactionRequest request = select(files, 
isForceAllFiles);
+
+        // Make sure that there is no del Partitions
+        Assert.assertTrue(request.getDelPartitions().size() == 0);
+
+        // Make sure that when there is no startKey/endKey for partition.
+        for (CompactionPartition p : request.getCompactionPartitions()) {
+          Assert.assertTrue(p.getStartKey() == null);
+          Assert.assertTrue(p.getEndKey() == null);
+        }
+        return null;
+      }
+    };
+
+    compactor.compact(allFiles, true);
+  }
+
+  static class MyPartitionedMobCompactor extends PartitionedMobCompactor {
+    int delPartitionSize = 0;
+    int PartitionsIncludeDelFiles = 0;
+    CacheConfig cacheConfig = null;
+
+    MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName 
tableName,
+        HColumnDescriptor column, ExecutorService pool, final int 
delPartitionSize,
+        final CacheConfig cacheConf, final int PartitionsIncludeDelFiles)
+        throws IOException {
+      super(conf, fs, tableName, column, pool);
+      this.delPartitionSize = delPartitionSize;
+      this.cacheConfig = cacheConf;
+      this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles;
+    }
+
+    @Override public List<Path> compact(List<FileStatus> files, boolean 
isForceAllFiles)
+        throws IOException {
+      if (files == null || files.isEmpty()) {
+        return null;
+      }
+      PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
+
+      Assert.assertTrue(request.getDelPartitions().size() == delPartitionSize);
+      if (request.getDelPartitions().size() > 0) {
+        for (CompactionPartition p : request.getCompactionPartitions()) {
+          Assert.assertTrue(p.getStartKey() != null);
+          Assert.assertTrue(p.getEndKey() != null);
+        }
+      }
+
+      try {
+        for (CompactionDelPartition delPartition : request.getDelPartitions()) 
{
+          for (Path newDelPath : delPartition.listDelFiles()) {
+            StoreFile sf = new StoreFile(fs, newDelPath, conf, 
this.cacheConfig, BloomType.NONE);
+            // pre-create reader of a del file to avoid race condition when 
opening the reader in each
+            // partition.
+            sf.createReader();
+            delPartition.addStoreFile(sf);
+          }
+        }
+
+        // Make sure that CompactionDelPartitions does not overlap
+        CompactionDelPartition prevDelP = null;
+        for (CompactionDelPartition delP : request.getDelPartitions()) {
+          Assert.assertTrue(
+              Bytes.compareTo(delP.getId().getStartKey(), 
delP.getId().getEndKey()) <= 0);
+
+          if (prevDelP != null) {
+            Assert.assertTrue(
+                Bytes.compareTo(prevDelP.getId().getEndKey(), 
delP.getId().getStartKey()) < 0);
+          }
+        }
+
+        int affectedPartitions = 0;
+
+        // Make sure that only del files within key range for a partition is 
included in compaction.
+        // compact the mob files by partitions in parallel.
+        for (CompactionPartition partition : 
request.getCompactionPartitions()) {
+          List<StoreFile> delFiles = getListOfDelFilesForPartition(partition, 
request.getDelPartitions());
+          if (!request.getDelPartitions().isEmpty()) {
+            if 
(!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
+                partition.getEndKey()) > 0) || (Bytes.compareTo(
+                
request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
+                    .getEndKey(), partition.getStartKey()) < 0))) {
+
+              if (delFiles.size() > 0) {
+                Assert.assertTrue(delFiles.size() == 1);
+                affectedPartitions += delFiles.size();
+                Assert.assertTrue(Bytes.compareTo(partition.getStartKey(),
+                    CellUtil.cloneRow(delFiles.get(0).getLastKey())) <= 0);
+                Assert.assertTrue(Bytes.compareTo(partition.getEndKey(),
+                    CellUtil.cloneRow(delFiles.get(delFiles.size() - 
1).getFirstKey())) >= 0);
+              }
+            }
+          }
+        }
+        // The del file is only included in one partition
+        Assert.assertTrue(affectedPartitions == PartitionsIncludeDelFiles);
+      } finally {
+        for (CompactionDelPartition delPartition : request.getDelPartitions()) 
{
+          for (StoreFile storeFile : delPartition.getStoreFiles()) {
+            try {
+              storeFile.closeReader(true);
+            } catch (IOException e) {
+              LOG.warn("Failed to close the reader on store file " + 
storeFile.getPath(), e);
+            }
+          }
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Test
+  public void testCompactFilesWithOneDelFile() throws Exception {
+    String tableName = "testCompactFilesWithOneDelFile";
+    resetConf();
+    init(tableName);
+
+    // Create only del file.
+    createMobFile(basePath);
+    createMobDelFile(basePath, 2);
+
+    listFiles();
+
+    MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, 
fs,
+        TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1);
+
+    compactor.compact(allFiles, true);
+  }
+
+  @Test
+  public void testCompactFilesWithMultiDelFiles() throws Exception {
+    String tableName = "testCompactFilesWithMultiDelFiles";
+    resetConf();
+    init(tableName);
+
+    // Create only del file.
+    createMobFile(basePath);
+    createMobDelFile(basePath, 0);
+    createMobDelFile(basePath, 1);
+    createMobDelFile(basePath, 2);
+
+    listFiles();
+
+    MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, 
fs,
+        TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3);
+    compactor.compact(allFiles, true);
+  }
 
   private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
       int delfileMaxCount)  throws Exception {
@@ -419,12 +648,53 @@ public class TestPartitionedMobCompactor {
           return null;
         }
         PartitionedMobCompactionRequest request = select(files, 
isForceAllFiles);
+
+        // Make sure that when there is no del files, there will be no 
startKey/endKey for partition.
+        if (request.getDelPartitions().size() == 0) {
+          for (CompactionPartition p : request.getCompactionPartitions()) {
+            Assert.assertTrue(p.getStartKey() == null);
+            Assert.assertTrue(p.getEndKey() == null);
+          }
+        }
+
+        // Make sure that CompactionDelPartitions does not overlap
+        CompactionDelPartition prevDelP = null;
+        for (CompactionDelPartition delP : request.getDelPartitions()) {
+          Assert.assertTrue(Bytes.compareTo(delP.getId().getStartKey(),
+              delP.getId().getEndKey()) <= 0);
+
+          if (prevDelP != null) {
+            Assert.assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(),
+                delP.getId().getStartKey()) < 0);
+          }
+        }
+
+        // Make sure that only del files within key range for a partition is 
included in compaction.
+        // compact the mob files by partitions in parallel.
+        for (CompactionPartition partition : 
request.getCompactionPartitions()) {
+          List<StoreFile> delFiles = getListOfDelFilesForPartition(partition, 
request.getDelPartitions());
+          if (!request.getDelPartitions().isEmpty()) {
+            if 
(!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
+                partition.getEndKey()) > 0) || (Bytes.compareTo(
+                
request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
+                    .getEndKey(), partition.getStartKey()) < 0))) {
+              if (delFiles.size() > 0) {
+                Assert.assertTrue(Bytes
+                    .compareTo(partition.getStartKey(), 
delFiles.get(0).getFirstKey().getRowArray())
+                    >= 0);
+                Assert.assertTrue(Bytes.compareTo(partition.getEndKey(),
+                    delFiles.get(delFiles.size() - 
1).getLastKey().getRowArray()) <= 0);
+              }
+            }
+          }
+        }
+
         // assert the compaction type
         Assert.assertEquals(type, request.type);
         // assert get the right partitions
         compareCompactedPartitions(expected, request.compactionPartitions);
         // assert get the right del files
-        compareDelFiles(request.delFiles);
+        compareDelFiles(request.getDelPartitions());
         return null;
       }
     };
@@ -446,8 +716,10 @@ public class TestPartitionedMobCompactor {
       protected List<Path> performCompaction(PartitionedMobCompactionRequest 
request)
           throws IOException {
         List<Path> delFilePaths = new ArrayList<Path>();
-        for (FileStatus delFile : request.delFiles) {
-          delFilePaths.add(delFile.getPath());
+        for (CompactionDelPartition delPartition: request.getDelPartitions()) {
+          for (Path p : delPartition.listDelFiles()) {
+            delFilePaths.add(p);
+          }
         }
         List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
         // assert the del files are merged.
@@ -466,7 +738,7 @@ public class TestPartitionedMobCompactor {
     for (FileStatus file : fs.listStatus(basePath)) {
       allFiles.add(file);
       if (file.getPath().getName().endsWith("_del")) {
-        delFiles.add(file);
+        delFiles.add(file.getPath());
       } else {
         mobFiles.add(file);
       }
@@ -493,13 +765,18 @@ public class TestPartitionedMobCompactor {
 
   /**
    * Compares the del files.
-   * @param allDelFiles all the del files
+   * @param delPartitions all del partitions
    */
-  private void compareDelFiles(Collection<FileStatus> allDelFiles) {
+  private void compareDelFiles(List<CompactionDelPartition> delPartitions) {
     int i = 0;
-    for (FileStatus file : allDelFiles) {
-      Assert.assertEquals(delFiles.get(i), file);
-      i++;
+    Map<Path, Path> delMap = new HashMap<>();
+    for (CompactionDelPartition delPartition : delPartitions) {
+      for (Path f : delPartition.listDelFiles()) {
+        delMap.put(f, f);
+      }
+    }
+    for (Path f : delFiles) {
+      Assert.assertTrue(delMap.containsKey(f));
     }
   }
 

Reply via email to