Repository: hbase
Updated Branches:
  refs/heads/master acc606571 -> 67f1ac1f8


HBASE-16578 Mob data loss after mob compaction and normal compaction


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67f1ac1f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67f1ac1f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67f1ac1f

Branch: refs/heads/master
Commit: 67f1ac1f8e6c33e7917231ef2d541fe7461b4be4
Parents: acc6065
Author: Jingcheng Du <jingcheng...@intel.com>
Authored: Wed Oct 19 13:58:54 2016 +0800
Committer: Jingcheng Du <jingcheng...@intel.com>
Committed: Wed Oct 19 13:58:54 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreFile.java    |   7 +-
 .../regionserver/compactions/Compactor.java     |   6 +-
 .../hbase/mob/compactions/TestMobCompactor.java | 127 +++++++++++++++----
 3 files changed, 113 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/67f1ac1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 589d844..7aef05e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -453,7 +453,12 @@ public class StoreFile {
       // loaded to hbase, these cells have the same seqIds with the old ones. 
We do not want
       // to reset new seqIds for them since this might make a mess of the 
visibility of cells that
       // have the same row key but different seqIds.
-      
this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
+      boolean skipResetSeqId = 
isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
+      if (skipResetSeqId) {
+        // increase the seqId when it is a bulk loaded file from mob 
compaction.
+        this.sequenceid += 1;
+      }
+      this.reader.setSkipResetSeqId(skipResetSeqId);
       this.reader.setBulkLoaded(true);
     }
     this.reader.setSequenceID(this.sequenceid);

http://git-wip-us.apache.org/repos/asf/hbase/blob/67f1ac1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index f4bd9a8..4e362c7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -276,7 +276,11 @@ public abstract class Compactor<T extends CellSink> {
       // HFiles, and their readers
       readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
       for (StoreFile f : request.getFiles()) {
-        readersToClose.add(f.cloneForReader());
+        StoreFile clonedStoreFile = f.cloneForReader();
+        // create the reader after the store file is cloned in case
+        // the sequence id is used for sorting in scanners
+        clonedStoreFile.createReader();
+        readersToClose.add(clonedStoreFile);
       }
       scanners = createFileScanners(readersToClose, smallestReadPoint,
         store.throttleCompaction(request.getSize()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/67f1ac1f/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
index 863a855..1630e2d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.mob.compactions;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.security.Key;
@@ -63,17 +64,23 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.io.crypto.aes.AES;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -118,6 +125,9 @@ public class TestMobCompactor {
     TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
       KeyProviderForTesting.class.getName());
     
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
"hbase");
+    TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 
0);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
+    
TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval",
 100);
     TEST_UTIL.startMiniCluster(1);
     pool = createThreadPool(TEST_UTIL.getConfiguration());
     conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), 
pool);
@@ -428,33 +438,100 @@ public class TestMobCompactor {
       .getName());
   }
 
+  /**
+   * This case tests the following mob compaction and normal compaction 
scenario,
+   * after mob compaction, the mob reference in new bulkloaded hfile will win 
even after it
+   * is compacted with some other normal hfiles. This is to make sure the mvcc 
is included
+   * after compaction for mob enabled store files.
+   */
   @Test(timeout = 300000)
-  public void testScannerAfterCompactions() throws Exception {
+  public void testGetAfterCompaction() throws Exception {
     resetConf();
-    setUp("testScannerAfterCompactions");
-    long ts = EnvironmentEdgeManager.currentTime();
-    byte[] key0 = Bytes.toBytes("k0");
-    byte[] key1 = Bytes.toBytes("k1");
-    String value = "mobValue"; // larger than threshold
-    String newValue = "new";
-    Put put0 = new Put(key0);
-    put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, 
Bytes.toBytes(value));
-    loadData(admin, bufMut, tableName, new Put[] { put0 });
-    Put put1 = new Put(key1);
-    put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, 
Bytes.toBytes(value));
-    loadData(admin, bufMut, tableName, new Put[] { put1 });
-    put1 = new Put(key1);
-    put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, 
Bytes.toBytes(newValue));
-    loadData(admin, bufMut, tableName, new Put[] { put1 }); // now two mob 
files
-    admin.majorCompact(tableName);
-    waitUntilCompactionFinished(tableName);
-    admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
-    waitUntilMobCompactionFinished(tableName);
-    // read the latest cell of key1.
-    Get get = new Get(key1);
-    Result result = table.get(get);
-    Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
-    assertEquals("After compaction: mob value", "new", 
Bytes.toString(CellUtil.cloneValue(cell)));
+    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
+    String famStr = "f1";
+    byte[] fam = Bytes.toBytes(famStr);
+    byte[] qualifier = Bytes.toBytes("q1");
+    byte[] mobVal = Bytes.toBytes("01234567890");
+    HTableDescriptor hdt = new 
HTableDescriptor(TableName.valueOf("testGetAfterCompaction"));
+    hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName());
+    HColumnDescriptor hcd = new HColumnDescriptor(fam);
+    hcd.setMobEnabled(true);
+    hcd.setMobThreshold(10);
+    hcd.setMaxVersions(1);
+    hdt.addFamily(hcd);
+    try {
+      Table table = TEST_UTIL.createTable(hdt, null);
+      HRegion r = 
TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0);
+      Put p = new Put(Bytes.toBytes("r1"));
+      p.addColumn(fam, qualifier, mobVal);
+      table.put(p);
+      // Create mob file mob1 and reference file ref1
+      TEST_UTIL.flush(table.getName());
+      // Make sure that it is flushed.
+      FileSystem fs = r.getRegionFileSystem().getFileSystem();
+      Path path = r.getRegionFileSystem().getStoreDir(famStr);
+      waitUntilFilesShowup(fs, path, 1);
+
+      p = new Put(Bytes.toBytes("r2"));
+      p.addColumn(fam, qualifier, mobVal);
+      table.put(p);
+      // Create mob file mob2 and reference file ref2
+      TEST_UTIL.flush(table.getName());
+      waitUntilFilesShowup(fs, path, 2);
+      // Do mob compaction to create mob3 and ref3
+      TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam, CompactType.MOB);
+      waitUntilFilesShowup(fs, path, 3);
+
+      // Compact ref3 and ref2 into ref4
+      TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam);
+      waitUntilFilesShowup(fs, path, 2);
+
+      // Sleep for some time, since TimeToLiveHFileCleaner is 0, the next run 
of
+      // clean chore is guaranteed to clean up files in archive
+      Thread.sleep(100);
+      // Run cleaner to make sure that files in archive directory are cleaned 
up
+      
TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+
+      // Get "r2"
+      Get get = new Get(Bytes.toBytes("r2"));
+      try {
+        Result result = table.get(get);
+        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
+      } catch (IOException e) {
+        assertTrue("The MOB file doesn't exist", false);
+      }
+    } finally {
+      TEST_UTIL.deleteTable(hdt.getTableName());
+    }
+  }
+
+  private void waitUntilFilesShowup(final FileSystem fs, final Path path, 
final int num)
+    throws InterruptedException, IOException {
+    FileStatus[] fileList = fs.listStatus(path);
+    while (fileList.length != num) {
+      Thread.sleep(50);
+      fileList = fs.listStatus(path);
+    }
+  }
+
+  /**
+   * This copro overwrites the default compaction policy. It always chooses 
two latest
+   * hfiles and compacts them into a new one.
+   */
+  public static class CompactTwoLatestHfilesCopro extends BaseRegionObserver {
+    @Override
+    public void preCompactSelection(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final List<StoreFile> candidates, final 
CompactionRequest request)
+      throws IOException {
+
+      int count = candidates.size();
+      if (count >= 2) {
+        for (int i = 0; i < count - 2; i++) {
+          candidates.remove(0);
+        }
+        c.bypass();
+      }
+    }
   }
 
   private void waitUntilCompactionFinished(TableName tableName) throws 
IOException,

Reply via email to