Repository: hbase
Updated Branches:
  refs/heads/master 3c35a722d -> c930bc92f


Revert "1). Fix resource leak issue upon exception during mob compaction. 2). 
Reorg the code in compactMobFilesInBatch() to make it more readable."

This reverts commit c7cae6be3dccfaa63033b705ea9845f3f088aab6.

Missing JIRA ID


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/932a1964
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/932a1964
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/932a1964

Branch: refs/heads/master
Commit: 932a1964bf578f953a4fd85a3b58d74185680785
Parents: 3c35a72
Author: Sean Busbey <bus...@apache.org>
Authored: Mon Oct 10 14:47:46 2016 -0500
Committer: Sean Busbey <bus...@apache.org>
Committed: Mon Oct 10 14:47:46 2016 -0500

----------------------------------------------------------------------
 .../compactions/PartitionedMobCompactor.java    | 157 ++++++++-----------
 .../TestPartitionedMobCompactor.java            |  90 +----------
 2 files changed, 69 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/932a1964/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 33aecc0..29b7e8a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor {
     }
     // archive the del files if all the mob files are selected.
     if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
-      LOG.info(
-          "After a mob compaction with all files selected, archiving the del 
files " + newDelPaths);
+      LOG.info("After a mob compaction with all files selected, archiving the 
del files "
+        + newDelPaths);
       try {
         MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, 
column.getName(), newDelFiles);
       } catch (IOException e) {
@@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor {
                                       List<StoreFile> filesToCompact, int 
batch,
                                       Path bulkloadPathOfPartition, Path 
bulkloadColumnPath,
                                       List<Path> newFiles)
-      throws IOException {
+    throws IOException {
     // open scanner to the selected mob files and del files.
     StoreScanner scanner = createScanner(filesToCompact, 
ScanType.COMPACT_DROP_DELETES);
     // the mob files to be compacted, not include the del files.
@@ -392,92 +392,62 @@ public class PartitionedMobCompactor extends MobCompactor 
{
     StoreFileWriter writer = null;
     StoreFileWriter refFileWriter = null;
     Path filePath = null;
+    Path refFilePath = null;
     long mobCells = 0;
-    boolean cleanupTmpMobFile = false;
-    boolean cleanupBulkloadDirOfPartition = false;
-    boolean cleanupCommittedMobFile = false;
-    boolean closeReaders= true;
-
     try {
-      try {
-        writer = MobUtils
-            .createWriter(conf, fs, column, 
partition.getPartitionId().getDate(), tempPath,
-                Long.MAX_VALUE, column.getCompactionCompressionType(),
-                partition.getPartitionId().getStartKey(), 
compactionCacheConfig, cryptoContext);
-        cleanupTmpMobFile = true;
-        filePath = writer.getPath();
-        byte[] fileName = Bytes.toBytes(filePath.getName());
-        // create a temp file and open a writer for it in the bulkloadPath
-        refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, 
bulkloadColumnPath,
-            fileInfo.getSecond().longValue(), compactionCacheConfig, 
cryptoContext);
-        cleanupBulkloadDirOfPartition = true;
-        List<Cell> cells = new ArrayList<>();
-        boolean hasMore;
-        ScannerContext scannerContext =
-            ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
-        do {
-          hasMore = scanner.next(cells, scannerContext);
-          for (Cell cell : cells) {
-            // write the mob cell to the mob file.
-            writer.append(cell);
-            // write the new reference cell to the store file.
-            KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, 
tableNameTag);
-            refFileWriter.append(reference);
-            mobCells++;
-          }
-          cells.clear();
-        } while (hasMore);
-      } finally {
-        // close the scanner.
-        scanner.close();
-
-        if (cleanupTmpMobFile) {
-          // append metadata to the mob file, and close the mob file writer.
-          closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
-        }
-
-        if (cleanupBulkloadDirOfPartition) {
-          // append metadata and bulkload info to the ref mob file, and close 
the writer.
-          closeRefFileWriter(refFileWriter, fileInfo.getFirst(), 
request.selectionTime);
+      writer = MobUtils.createWriter(conf, fs, column, 
partition.getPartitionId().getDate(),
+        tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), 
partition.getPartitionId()
+          .getStartKey(), compactionCacheConfig, cryptoContext);
+      filePath = writer.getPath();
+      byte[] fileName = Bytes.toBytes(filePath.getName());
+      // create a temp file and open a writer for it in the bulkloadPath
+      refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, 
bulkloadColumnPath, fileInfo
+        .getSecond().longValue(), compactionCacheConfig, cryptoContext);
+      refFilePath = refFileWriter.getPath();
+      List<Cell> cells = new ArrayList<>();
+      boolean hasMore;
+      ScannerContext scannerContext =
+              
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+      do {
+        hasMore = scanner.next(cells, scannerContext);
+        for (Cell cell : cells) {
+          // write the mob cell to the mob file.
+          writer.append(cell);
+          // write the new reference cell to the store file.
+          KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, 
tableNameTag);
+          refFileWriter.append(reference);
+          mobCells++;
         }
-      }
-
-      if (mobCells > 0) {
-        // commit mob file
-        MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, 
compactionCacheConfig);
-        cleanupTmpMobFile = false;
-        cleanupCommittedMobFile = true;
-        // bulkload the ref file
-        bulkloadRefFile(connection, table, bulkloadPathOfPartition, 
filePath.getName());
-        cleanupCommittedMobFile = false;
-        newFiles.add(new Path(mobFamilyDir, filePath.getName()));
-      }
-
-      // archive the old mob files, do not archive the del files.
-      try {
-        closeStoreFileReaders(mobFilesToCompact);
-        closeReaders = false;
-        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, 
column.getName(), mobFilesToCompact);
-      } catch (IOException e) {
-        LOG.error("Failed to archive the files " + mobFilesToCompact, e);
-      }
+        cells.clear();
+      } while (hasMore);
     } finally {
-      if (closeReaders) {
-        closeStoreFileReaders(mobFilesToCompact);
-      }
-
-      if (cleanupTmpMobFile) {
-        deletePath(filePath);
-      }
-
-      if (cleanupBulkloadDirOfPartition) {
-        // delete the bulkload files in bulkloadPath
-        deletePath(bulkloadPathOfPartition);
-      }
-
-      if (cleanupCommittedMobFile) {
-        deletePath(new Path(mobFamilyDir, filePath.getName()));
-      }
+      // close the scanner.
+      scanner.close();
+      // append metadata to the mob file, and close the mob file writer.
+      closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
+      // append metadata and bulkload info to the ref mob file, and close the 
writer.
+      closeRefFileWriter(refFileWriter, fileInfo.getFirst(), 
request.selectionTime);
+    }
+    if (mobCells > 0) {
+      // commit mob file
+      MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, 
compactionCacheConfig);
+      // bulkload the ref file
+      bulkloadRefFile(connection, table, bulkloadPathOfPartition, 
filePath.getName());
+      newFiles.add(new Path(mobFamilyDir, filePath.getName()));
+    } else {
+      // remove the new files
+      // the mob file is empty, delete it instead of committing.
+      deletePath(filePath);
+      // the ref file is empty, delete it instead of committing.
+      deletePath(refFilePath);
+    }
+    // archive the old mob files, do not archive the del files.
+    try {
+      closeStoreFileReaders(mobFilesToCompact);
+      MobUtils
+        .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), 
mobFilesToCompact);
+    } catch (IOException e) {
+      LOG.error("Failed to archive the files " + mobFilesToCompact, e);
     }
   }
 
@@ -539,7 +509,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       writer = MobUtils.createDelFileWriter(conf, fs, column,
         MobUtils.formatDate(new Date(request.selectionTime)), tempPath, 
Long.MAX_VALUE,
         column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, 
compactionCacheConfig,
-          cryptoContext);
+        cryptoContext);
       filePath = writer.getPath();
       List<Cell> cells = new ArrayList<>();
       boolean hasMore;
@@ -602,15 +572,22 @@ public class PartitionedMobCompactor extends MobCompactor 
{
    * @throws IOException if IO failure is encountered
    */
   private void bulkloadRefFile(Connection connection, Table table, Path 
bulkloadDirectory,
-      String fileName)
-      throws IOException {
+                               String fileName)
+    throws IOException {
     // bulkload the ref file
     try {
       LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
-      bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
-          connection.getRegionLocator(table.getName()));
+      bulkload.doBulkLoad(bulkloadDirectory,
+        connection.getAdmin(),
+        table,
+        connection.getRegionLocator(table.getName()));
     } catch (Exception e) {
+      // delete the committed mob file
+      deletePath(new Path(mobFamilyDir, fileName));
       throw new IOException(e);
+    } finally {
+      // delete the bulkload files in bulkloadPath
+      deletePath(bulkloadDirectory);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/932a1964/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index 7da8544..7970d62 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -53,10 +53,8 @@ import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import static org.junit.Assert.assertTrue;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -81,9 +79,6 @@ public class TestPartitionedMobCompactor {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
-    // Inject our customized DistributedFileSystem
-    TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", 
FaultyDistributedFileSystem.class,
-        DistributedFileSystem.class);
     TEST_UTIL.startMiniCluster(1);
     pool = createThreadPool();
   }
@@ -165,51 +160,6 @@ public class TestPartitionedMobCompactor {
     testCompactDelFilesAtBatchSize(tableName, 4, 2);
   }
 
-  @Test
-  public void testCompactFilesWithDstDirFull() throws Exception {
-    String tableName = "testCompactFilesWithDstDirFull";
-    fs = FileSystem.get(conf);
-    FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
-    Path testDir = FSUtils.getRootDir(conf);
-    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
-    basePath = new Path(new Path(mobTestDir, tableName), family);
-
-    try {
-      int count = 2;
-      // create 2 mob files.
-      createStoreFiles(basePath, family, qf, count, Type.Put, true);
-      listFiles();
-
-      TableName tName = TableName.valueOf(tableName);
-      MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, 
tName, hcd, pool);
-      faultyFs.setThrowException(true);
-      try {
-        compactor.compact(allFiles, true);
-      } catch (IOException e) {
-        System.out.println("Expected exception, ignore");
-      }
-
-      // Verify that all the files in tmp directory are cleaned up
-      Path tempPath = new Path(MobUtils.getMobHome(conf), 
MobConstants.TEMP_DIR_NAME);
-      FileStatus[] ls = faultyFs.listStatus(tempPath);
-
-      // Only .bulkload under this directory
-      assertTrue(ls.length == 1);
-      
assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
-
-      Path bulkloadPath = new Path(tempPath, new 
Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
-          tName.getNamespaceAsString(), tName.getQualifierAsString())));
-
-      // Nothing in bulkLoad directory
-      FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
-      assertTrue(lsBulkload.length == 0);
-
-    } finally {
-      faultyFs.setThrowException(false);
-    }
-  }
-
-
   private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
       int delfileMaxCount)  throws Exception {
     resetConf();
@@ -339,30 +289,17 @@ public class TestPartitionedMobCompactor {
    */
   private void createStoreFiles(Path basePath, String family, String 
qualifier, int count,
       Type type) throws IOException {
-    createStoreFiles(basePath, family, qualifier, count, type, false);
-  }
-
-  private void createStoreFiles(Path basePath, String family, String 
qualifier, int count,
-      Type type, boolean sameStartKey) throws IOException {
     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 
1024).build();
     String startKey = "row_";
     MobFileName mobFileName = null;
     for (int i = 0; i < count; i++) {
-      byte[] startRow;
-      if (sameStartKey) {
-        // When creating multiple files under one partition, suffix needs to 
be different.
-        startRow = Bytes.toBytes(startKey);
-        mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
-        delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
-      } else {
-        startRow = Bytes.toBytes(startKey + i);
-      }
+      byte[] startRow = Bytes.toBytes(startKey + i) ;
       if(type.equals(Type.Delete)) {
         mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
             new Date()), delSuffix);
       }
       if(type.equals(Type.Put)){
-        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
+        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), 
MobUtils.formatDate(
             new Date()), mobSuffix);
       }
       StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, 
cacheConf, fs)
@@ -457,27 +394,4 @@ public class TestPartitionedMobCompactor {
     conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
   }
-
-  /**
-   * The customized Distributed File System Implementation
-   */
-  static class FaultyDistributedFileSystem extends DistributedFileSystem {
-    private volatile boolean throwException = false;
-
-    public FaultyDistributedFileSystem() {
-      super();
-    }
-
-    public void setThrowException(boolean throwException) {
-      this.throwException = throwException;
-    }
-
-    @Override
-    public boolean rename(Path src, Path dst) throws IOException {
-      if (throwException) {
-        throw new IOException("No more files allowed");
-      }
-      return super.rename(src, dst);
-    }
-  }
 }

Reply via email to