Repository: hbase Updated Branches: refs/heads/master acc606571 -> 67f1ac1f8
HBASE-16578 Mob data loss after mob compaction and normal compaction Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67f1ac1f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67f1ac1f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67f1ac1f Branch: refs/heads/master Commit: 67f1ac1f8e6c33e7917231ef2d541fe7461b4be4 Parents: acc6065 Author: Jingcheng Du <jingcheng...@intel.com> Authored: Wed Oct 19 13:58:54 2016 +0800 Committer: Jingcheng Du <jingcheng...@intel.com> Committed: Wed Oct 19 13:58:54 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/StoreFile.java | 7 +- .../regionserver/compactions/Compactor.java | 6 +- .../hbase/mob/compactions/TestMobCompactor.java | 127 +++++++++++++++---- 3 files changed, 113 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/67f1ac1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 589d844..7aef05e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -453,7 +453,12 @@ public class StoreFile { // loaded to hbase, these cells have the same seqIds with the old ones. We do not want // to reset new seqIds for them since this might make a mess of the visibility of cells that // have the same row key but different seqIds. - this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID))); + boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)); + if (skipResetSeqId) { + // increase the seqId when it is a bulk loaded file from mob compaction. + this.sequenceid += 1; + } + this.reader.setSkipResetSeqId(skipResetSeqId); this.reader.setBulkLoaded(true); } this.reader.setSequenceID(this.sequenceid); http://git-wip-us.apache.org/repos/asf/hbase/blob/67f1ac1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index f4bd9a8..4e362c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -276,7 +276,11 @@ public abstract class Compactor<T extends CellSink> { // HFiles, and their readers readersToClose = new ArrayList<StoreFile>(request.getFiles().size()); for (StoreFile f : request.getFiles()) { - readersToClose.add(f.cloneForReader()); + StoreFile clonedStoreFile = f.cloneForReader(); + // create the reader after the store file is cloned in case + // the sequence id is used for sorting in scanners + clonedStoreFile.createReader(); + readersToClose.add(clonedStoreFile); } scanners = createFileScanners(readersToClose, smallestReadPoint, store.throttleCompaction(request.getSize())); http://git-wip-us.apache.org/repos/asf/hbase/blob/67f1ac1f/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index 863a855..1630e2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mob.compactions; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.security.Key; @@ -63,17 +64,23 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.io.crypto.aes.AES; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -118,6 +125,9 @@ public class TestMobCompactor { TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); + TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1); + TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100); TEST_UTIL.startMiniCluster(1); pool = createThreadPool(TEST_UTIL.getConfiguration()); conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool); @@ -428,33 +438,100 @@ public class TestMobCompactor { .getName()); } + /** + * This case tests the following mob compaction and normal compaction scenario, + * after mob compaction, the mob reference in new bulkloaded hfile will win even after it + * is compacted with some other normal hfiles. This is to make sure the mvcc is included + * after compaction for mob enabled store files. + */ @Test(timeout = 300000) - public void testScannerAfterCompactions() throws Exception { + public void testGetAfterCompaction() throws Exception { resetConf(); - setUp("testScannerAfterCompactions"); - long ts = EnvironmentEdgeManager.currentTime(); - byte[] key0 = Bytes.toBytes("k0"); - byte[] key1 = Bytes.toBytes("k1"); - String value = "mobValue"; // larger than threshold - String newValue = "new"; - Put put0 = new Put(key0); - put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value)); - loadData(admin, bufMut, tableName, new Put[] { put0 }); - Put put1 = new Put(key1); - put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value)); - loadData(admin, bufMut, tableName, new Put[] { put1 }); - put1 = new Put(key1); - put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue)); - loadData(admin, bufMut, tableName, new Put[] { put1 }); // now two mob files - admin.majorCompact(tableName); - waitUntilCompactionFinished(tableName); - admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB); - waitUntilMobCompactionFinished(tableName); - // read the latest cell of key1. - Get get = new Get(key1); - Result result = table.get(get); - Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); - assertEquals("After compaction: mob value", "new", Bytes.toString(CellUtil.cloneValue(cell))); + conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); + String famStr = "f1"; + byte[] fam = Bytes.toBytes(famStr); + byte[] qualifier = Bytes.toBytes("q1"); + byte[] mobVal = Bytes.toBytes("01234567890"); + HTableDescriptor hdt = new HTableDescriptor(TableName.valueOf("testGetAfterCompaction")); + hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName()); + HColumnDescriptor hcd = new HColumnDescriptor(fam); + hcd.setMobEnabled(true); + hcd.setMobThreshold(10); + hcd.setMaxVersions(1); + hdt.addFamily(hcd); + try { + Table table = TEST_UTIL.createTable(hdt, null); + HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0); + Put p = new Put(Bytes.toBytes("r1")); + p.addColumn(fam, qualifier, mobVal); + table.put(p); + // Create mob file mob1 and reference file ref1 + TEST_UTIL.flush(table.getName()); + // Make sure that it is flushed. + FileSystem fs = r.getRegionFileSystem().getFileSystem(); + Path path = r.getRegionFileSystem().getStoreDir(famStr); + waitUntilFilesShowup(fs, path, 1); + + p = new Put(Bytes.toBytes("r2")); + p.addColumn(fam, qualifier, mobVal); + table.put(p); + // Create mob file mob2 and reference file ref2 + TEST_UTIL.flush(table.getName()); + waitUntilFilesShowup(fs, path, 2); + // Do mob compaction to create mob3 and ref3 + TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam, CompactType.MOB); + waitUntilFilesShowup(fs, path, 3); + + // Compact ref3 and ref2 into ref4 + TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam); + waitUntilFilesShowup(fs, path, 2); + + // Sleep for some time, since TimeToLiveHFileCleaner is 0, the next run of + // clean chore is guaranteed to clean up files in archive + Thread.sleep(100); + // Run cleaner to make sure that files in archive directory are cleaned up + TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); + + // Get "r2" + Get get = new Get(Bytes.toBytes("r2")); + try { + Result result = table.get(get); + assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); + } catch (IOException e) { + assertTrue("The MOB file doesn't exist", false); + } + } finally { + TEST_UTIL.deleteTable(hdt.getTableName()); + } + } + + private void waitUntilFilesShowup(final FileSystem fs, final Path path, final int num) + throws InterruptedException, IOException { + FileStatus[] fileList = fs.listStatus(path); + while (fileList.length != num) { + Thread.sleep(50); + fileList = fs.listStatus(path); + } + } + + /** + * This copro overwrites the default compaction policy. It always chooses two latest + * hfiles and compacts them into a new one. + */ + public static class CompactTwoLatestHfilesCopro extends BaseRegionObserver { + @Override + public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final List<StoreFile> candidates, final CompactionRequest request) + throws IOException { + + int count = candidates.size(); + if (count >= 2) { + for (int i = 0; i < count - 2; i++) { + candidates.remove(0); + } + c.bypass(); + } + } } private void waitUntilCompactionFinished(TableName tableName) throws IOException,