http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 37b23e0..da01fb9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.EOFException; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -50,7 +52,8 @@ import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -82,9 +85,37 @@ public class TestLogRolling extends AbstractTestLogRolling { TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem"); AbstractTestLogRolling.setUpBeforeClass(); } + void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) + throws IOException { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); + put.addColumn(HConstants.CATALOG_FAMILY, null, value); + table.put(put); + } + Put tmpPut = new Put(Bytes.toBytes("tmprow")); + tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); + long startTime = System.currentTimeMillis(); + long remaining = timeout; + while (remaining > 0) { + if (log.isLowReplicationRollEnabled() == expect) { + break; + } else { + // Trigger calling FSHlog#checkLowReplication() + table.put(tmpPut); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // continue + } + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + } + /** * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & * syncFs() support (HDFS-200) @@ -148,12 +179,12 @@ public class TestLogRolling extends AbstractTestLogRolling { long curTime = System.currentTimeMillis(); LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); - long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); + long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); assertTrue("Log should have a timestamp older than now", curTime > oldFilenum && oldFilenum != -1); assertTrue("The log shouldn't have rolled yet", - oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log)); + oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); final DatanodeInfo[] pipeline = log.getPipeline(); assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); @@ -163,7 +194,7 @@ public class TestLogRolling extends AbstractTestLogRolling { // this write should succeed, but trigger a log roll writeData(table, 2); - long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log); + long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); assertTrue("Missing datanode should've triggered a log roll", newFilenum > oldFilenum && newFilenum > curTime); @@ -174,7 +205,7 @@ public class TestLogRolling extends AbstractTestLogRolling { // write some more log data (this should use a new hdfs_out) writeData(table, 3); assertTrue("The log should not roll again.", - DefaultWALProvider.extractFileNumFromWAL(log) == newFilenum); + AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); // kill another datanode in the pipeline, so the replicas will be lower than // the configured value 2. assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); @@ -224,7 +255,7 @@ public class TestLogRolling extends AbstractTestLogRolling { final List<Path> paths = new ArrayList<Path>(); final List<Integer> preLogRolledCalled = new ArrayList<Integer>(); - paths.add(DefaultWALProvider.getCurrentFileName(log)); + paths.add(AbstractFSWALProvider.getCurrentFileName(log)); log.registerWALActionsListener(new WALActionsListener.Base() { @Override @@ -246,13 +277,13 @@ public class TestLogRolling extends AbstractTestLogRolling { writeData(table, 1002); long curTime = System.currentTimeMillis(); - LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); - long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); + LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); + long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); assertTrue("Log should have a timestamp older than now", curTime > oldFilenum && oldFilenum != -1); assertTrue("The log shouldn't have rolled yet", - oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log)); + oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); // roll all datanodes in the pipeline dfsCluster.restartDataNodes(); @@ -263,7 +294,7 @@ public class TestLogRolling extends AbstractTestLogRolling { // this write should succeed, but trigger a log roll writeData(table, 1003); - long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log); + long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); assertTrue("Missing datanode should've triggered a log roll", newFilenum > oldFilenum && newFilenum > curTime);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 9ab7b7d..eda7df7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -70,6 +70,7 @@ public class TestLogRollingNoCluster { // The implementation needs to know the 'handler' count. TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT); final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); FSUtils.setRootDir(conf, dir); final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); final WAL wal = wals.getWAL(new byte[]{}, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index 61ee589..0add852 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.experimental.categories.Category; @@ -33,7 +33,7 @@ public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer> @Override protected Writer createWriter(Path path) throws IOException { - return DefaultWALProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); + return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java index 73c0216..5b8b404 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java @@ -32,7 +32,7 @@ public class TestSecureAsyncWALReplay extends TestAsyncWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java index be5d951..91172ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java @@ -19,21 +19,20 @@ package org.apache.hadoop.hbase.regionserver.wal; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; - import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -@Category({RegionServerTests.class, MediumTests.class}) +@Category({ RegionServerTests.class, MediumTests.class }) public class TestSecureWALReplay extends TestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, @@ -41,7 +40,6 @@ public class TestSecureWALReplay extends TestWALReplay { conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); - TestWALReplay.setUpBeforeClass(); + AbstractTestWALReplay.setUpBeforeClass(); } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 3e894d7..b63aa17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,1265 +17,35 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.FilterInputStream; import java.io.IOException; -import java.lang.reflect.Field; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; -import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; -import org.apache.hadoop.hbase.regionserver.FlushRequestListener; -import org.apache.hadoop.hbase.regionserver.FlushRequester; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HFileTestUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; -import org.apache.hadoop.hdfs.DFSInputStream; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Test replay of edits out of a WAL split. - */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestWALReplay { - private static final Log LOG = LogFactory.getLog(TestWALReplay.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); - private Path hbaseRootDir = null; - private String logName; - private Path oldLogDir; - private Path logDir; - private FileSystem fs; - private Configuration conf; - private RecoveryMode mode; - private WALFactory wals; - - @Rule - public final TestName currentTest = new TestName(); +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALReplay extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setBoolean("dfs.support.append", true); - // The below config supported by 0.20-append and CDH3b2 - conf.setInt("dfs.client.block.recovery.retries", 2); - TEST_UTIL.startMiniCluster(3); - Path hbaseRootDir = - TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); - LOG.info("hbase.rootdir=" + hbaseRootDir); - FSUtils.setRootDir(conf, hbaseRootDir); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before - public void setUp() throws Exception { - this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); - this.hbaseRootDir = FSUtils.getRootDir(this.conf); - this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual"); - this.logDir = new Path(this.hbaseRootDir, logName); - if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { - TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); - } - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); - this.wals = new WALFactory(conf, null, currentTest.getMethodName()); - } - - @After - public void tearDown() throws Exception { - this.wals.close(); - TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); - } - - /* - * @param p Directory to cleanup - */ - private void deleteDir(final Path p) throws IOException { - if (this.fs.exists(p)) { - if (!this.fs.delete(p, true)) { - throw new IOException("Failed remove of " + p); - } - } - } - - /** - * - * @throws Exception - */ - @Test - public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { - final TableName tableName = - TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); - byte[] family1 = Bytes.toBytes("cf1"); - byte[] family2 = Bytes.toBytes("cf2"); - byte[] qualifier = Bytes.toBytes("q"); - byte[] value = Bytes.toBytes("testV"); - byte[][] familys = { family1, family2 }; - TEST_UTIL.createTable(tableName, familys); - Table htable = TEST_UTIL.getConnection().getTable(tableName); - Put put = new Put(Bytes.toBytes("r1")); - put.addColumn(family1, qualifier, value); - htable.put(put); - ResultScanner resultScanner = htable.getScanner(new Scan()); - int count = 0; - while (resultScanner.next() != null) { - count++; - } - resultScanner.close(); - assertEquals(1, count); - - MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); - List<HRegion> regions = hbaseCluster.getRegions(tableName); - assertEquals(1, regions.size()); - - // move region to another regionserver - Region destRegion = regions.get(0); - int originServerNum = hbaseCluster - .getServerWith(destRegion.getRegionInfo().getRegionName()); - assertTrue("Please start more than 1 regionserver", hbaseCluster - .getRegionServerThreads().size() > 1); - int destServerNum = 0; - while (destServerNum == originServerNum) { - destServerNum++; - } - HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); - HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); - // move region to destination regionserver - moveRegionAndWait(destRegion, destServer); - - // delete the row - Delete del = new Delete(Bytes.toBytes("r1")); - htable.delete(del); - resultScanner = htable.getScanner(new Scan()); - count = 0; - while (resultScanner.next() != null) { - count++; - } - resultScanner.close(); - assertEquals(0, count); - - // flush region and make major compaction - Region region = destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); - region.flush(true); - // wait to complete major compaction - for (Store store : region.getStores()) { - store.triggerMajorCompaction(); - } - region.compact(true); - - // move region to origin regionserver - moveRegionAndWait(destRegion, originServer); - // abort the origin regionserver - originServer.abort("testing"); - - // see what we get - Result result = htable.get(new Get(Bytes.toBytes("r1"))); - if (result != null) { - assertTrue("Row is deleted, but we get" + result.toString(), - (result == null) || result.isEmpty()); - } - resultScanner.close(); - } - - private void moveRegionAndWait(Region destRegion, HRegionServer destServer) - throws InterruptedException, MasterNotRunningException, - ZooKeeperConnectionException, IOException { - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - TEST_UTIL.getHBaseAdmin().move( - destRegion.getRegionInfo().getEncodedNameAsBytes(), - Bytes.toBytes(destServer.getServerName().getServerName())); - while (true) { - ServerName serverName = master.getAssignmentManager() - .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo()); - if (serverName != null && serverName.equals(destServer.getServerName())) { - TEST_UTIL.assertRegionOnServer( - destRegion.getRegionInfo(), serverName, 200); - break; - } - Thread.sleep(10); - } - } - - /** - * Tests for hbase-2727. - * @throws Exception - * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> - */ - @Test - public void test2727() throws Exception { - // Test being able to have > 1 set of edits in the recovered.edits directory. - // Ensure edits are replayed properly. - final TableName tableName = - TableName.valueOf("test2727"); - - MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); - deleteDir(basedir); - - HTableDescriptor htd = createBasic3FamilyHTD(tableName); - Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - final byte [] rowName = tableName.getName(); - - WAL wal1 = createWAL(this.conf); - // Add 1k to each family. - final int countPerFamily = 1000; - - NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - for (HColumnDescriptor hcd: htd.getFamilies()) { - addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, - wal1, htd, mvcc, scopes); - } - wal1.shutdown(); - runWALSplit(this.conf); - - WAL wal2 = createWAL(this.conf); - // Add 1k to each family. - for (HColumnDescriptor hcd: htd.getFamilies()) { - addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal2, htd, mvcc, scopes); - } - wal2.shutdown(); - runWALSplit(this.conf); - - WAL wal3 = createWAL(this.conf); - try { - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); - long seqid = region.getOpenSeqNum(); - // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. - // When opened, this region would apply 6k edits, and increment the sequenceId by 1 - assertTrue(seqid > mvcc.getWritePoint()); - assertEquals(seqid - 1, mvcc.getWritePoint()); - LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " - + mvcc.getReadPoint()); - - // TODO: Scan all. - region.close(); - } finally { - wal3.close(); - } - } - - /** - * Test case of HRegion that is only made out of bulk loaded files. Assert - * that we don't 'crash'. - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testRegionMadeOfBulkLoadedFilesOnly() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); - deleteDir(basedir); - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - WAL wal = createWAL(this.conf); - Region region = HRegion.openHRegion(hri, htd, wal, this.conf); - - byte [] family = htd.getFamilies().iterator().next().getName(); - Path f = new Path(basedir, "hfile"); - HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), - Bytes.toBytes("z"), 10); - List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1); - hfs.add(Pair.newPair(family, f.toString())); - region.bulkLoadHFiles(hfs, true, null); - - // Add an edit so something in the WAL - byte [] row = tableName.getName(); - region.put((new Put(row)).addColumn(family, family, family)); - wal.sync(); - final int rowsInsertedCount = 11; - - assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); - - // Now 'crash' the region by stealing its wal - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - runWALSplit(newConf); - WAL wal2 = createWAL(newConf); - - HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), - hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid2 > -1); - assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); - - // I can't close wal1. Its been appropriated when we split. - region2.close(); - wal2.close(); - return null; - } - }); - } - - /** - * HRegion test case that is made of a major compacted HFile (created with three bulk loaded - * files) and an edit in the memstore. - * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries - * from being replayed" - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testCompactedBulkLoadedFiles() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testCompactedBulkLoadedFiles"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); - deleteDir(basedir); - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); - - // Add an edit so something in the WAL - byte [] row = tableName.getName(); - byte [] family = htd.getFamilies().iterator().next().getName(); - region.put((new Put(row)).addColumn(family, family, family)); - wal.sync(); - - List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1); - for (int i = 0; i < 3; i++) { - Path f = new Path(basedir, "hfile"+i); - HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), - Bytes.toBytes(i + "50"), 10); - hfs.add(Pair.newPair(family, f.toString())); - } - region.bulkLoadHFiles(hfs, true, null); - final int rowsInsertedCount = 31; - assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); - - // major compact to turn all the bulk loaded files into one normal file - region.compact(true); - assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); - - // Now 'crash' the region by stealing its wal - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - runWALSplit(newConf); - WAL wal2 = createWAL(newConf); - - HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), - hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid2 > -1); - assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); - - // I can't close wal1. Its been appropriated when we split. - region2.close(); - wal2.close(); - return null; - } - }); - } - - - /** - * Test writing edits into an HRegion, closing it, splitting logs, opening - * Region again. Verify seqids. - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testReplayEditsWrittenViaHRegion() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testReplayEditsWrittenViaHRegion"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region3); - // Write countPerFamily edits into the three families. Do a flush on one - // of the families during the load of edits so its seqid is not same as - // others to test we do right thing when different seqids. - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - long seqid = region.getOpenSeqNum(); - boolean first = true; - for (HColumnDescriptor hcd: htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - if (first) { - // If first, so we have at least one family w/ different seqid to rest. - region.flush(true); - first = false; - } - } - // Now assert edits made it in. - final Get g = new Get(rowName); - Result result = region.get(g); - assertEquals(countPerFamily * htd.getFamilies().size(), - result.size()); - // Now close the region (without flush), split the log, reopen the region and assert that - // replay of log has the correct effect, that our seqids are calculated correctly so - // all edits in logs are seen as 'stale'/old. - region.close(true); - wal.shutdown(); - runWALSplit(this.conf); - WAL wal2 = createWAL(this.conf); - HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid + result.size() < seqid2); - final Result result1b = region2.get(g); - assertEquals(result.size(), result1b.size()); - - // Next test. Add more edits, then 'crash' this region by stealing its wal - // out from under it and assert that replay of the log adds the edits back - // correctly when region is opened again. - for (HColumnDescriptor hcd: htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); - } - // Get count of edits. - final Result result2 = region2.get(g); - assertEquals(2 * result.size(), result2.size()); - wal2.sync(); - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - runWALSplit(newConf); - FileSystem newFS = FileSystem.get(newConf); - // Make a new wal for new region open. - WAL wal3 = createWAL(newConf); - final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); - HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { - @Override - protected boolean restoreEdit(Store s, Cell cell) { - boolean b = super.restoreEdit(s, cell); - countOfRestoredEdits.incrementAndGet(); - return b; - } - }; - long seqid3 = region3.initialize(); - Result result3 = region3.get(g); - // Assert that count of cells is same as before crash. - assertEquals(result2.size(), result3.size()); - assertEquals(htd.getFamilies().size() * countPerFamily, - countOfRestoredEdits.get()); - - // I can't close wal1. Its been appropriated when we split. - region3.close(); - wal3.close(); - return null; - } - }); - } - - /** - * Test that we recover correctly when there is a failure in between the - * flushes. i.e. Some stores got flushed but others did not. - * - * Unfortunately, there is no easy hook to flush at a store level. The way - * we get around this is by flushing at the region level, and then deleting - * the recently flushed store file for one of the Stores. This would put us - * back in the situation where all but that store got flushed and the region - * died. - * - * We restart Region again, and verify that the edits were replayed. - * - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testReplayEditsAfterPartialFlush() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testReplayEditsWrittenViaHRegion"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region3); - // Write countPerFamily edits into the three families. Do a flush on one - // of the families during the load of edits so its seqid is not same as - // others to test we do right thing when different seqids. - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - long seqid = region.getOpenSeqNum(); - for (HColumnDescriptor hcd: htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - } - - // Now assert edits made it in. - final Get g = new Get(rowName); - Result result = region.get(g); - assertEquals(countPerFamily * htd.getFamilies().size(), - result.size()); - - // Let us flush the region - region.flush(true); - region.close(true); - wal.shutdown(); - - // delete the store files in the second column family to simulate a failure - // in between the flushcache(); - // we have 3 families. killing the middle one ensures that taking the maximum - // will make us fail. - int cf_count = 0; - for (HColumnDescriptor hcd: htd.getFamilies()) { - cf_count++; - if (cf_count == 2) { - region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); - } - } - - - // Let us try to split and recover - runWALSplit(this.conf); - WAL wal2 = createWAL(this.conf); - HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid + result.size() < seqid2); - - final Result result1b = region2.get(g); - assertEquals(result.size(), result1b.size()); - } - - - // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. - // Only throws exception if throwExceptionWhenFlushing is set true. - public static class CustomStoreFlusher extends DefaultStoreFlusher { - // Switch between throw and not throw exception in flush - static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); - - public CustomStoreFlusher(Configuration conf, Store store) { - super(conf, store); - } - @Override - public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController) throws IOException { - if (throwExceptionWhenFlushing.get()) { - throw new IOException("Simulated exception by tests"); - } - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); - } - - }; - - /** - * Test that we could recover the data correctly after aborting flush. In the - * test, first we abort flush after writing some data, then writing more data - * and flush again, at last verify the data. - * @throws IOException - */ - @Test - public void testReplayEditsAfterAbortingFlush() throws IOException { - final TableName tableName = - TableName.valueOf("testReplayEditsAfterAbortingFlush"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region3); - // Write countPerFamily edits into the three families. Do a flush on one - // of the families during the load of edits so its seqid is not same as - // others to test we do right thing when different seqids. - WAL wal = createWAL(this.conf); - RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); - Mockito.doReturn(false).when(rsServices).isAborted(); - when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); - Configuration customConf = new Configuration(this.conf); - customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, - CustomStoreFlusher.class.getName()); - HRegion region = - HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); - int writtenRowCount = 10; - List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>( - htd.getFamilies()); - for (int i = 0; i < writtenRowCount; i++) { - Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); - put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), - Bytes.toBytes("val")); - region.put(put); - } - - // Now assert edits made it in. - RegionScanner scanner = region.getScanner(new Scan()); - assertEquals(writtenRowCount, getScannedCount(scanner)); - - // Let us flush the region - CustomStoreFlusher.throwExceptionWhenFlushing.set(true); - try { - region.flush(true); - fail("Injected exception hasn't been thrown"); - } catch (Throwable t) { - LOG.info("Expected simulated exception when flushing region," - + t.getMessage()); - // simulated to abort server - Mockito.doReturn(true).when(rsServices).isAborted(); - region.setClosing(false); // region normally does not accept writes after - // DroppedSnapshotException. We mock around it for this test. - } - // writing more data - int moreRow = 10; - for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { - Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); - put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), - Bytes.toBytes("val")); - region.put(put); - } - writtenRowCount += moreRow; - // call flush again - CustomStoreFlusher.throwExceptionWhenFlushing.set(false); - try { - region.flush(true); - } catch (IOException t) { - LOG.info("Expected exception when flushing region because server is stopped," - + t.getMessage()); - } - - region.close(true); - wal.shutdown(); - - // Let us try to split and recover - runWALSplit(this.conf); - WAL wal2 = createWAL(this.conf); - Mockito.doReturn(false).when(rsServices).isAborted(); - HRegion region2 = - HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); - scanner = region2.getScanner(new Scan()); - assertEquals(writtenRowCount, getScannedCount(scanner)); - } - - private int getScannedCount(RegionScanner scanner) throws IOException { - int scannedCount = 0; - List<Cell> results = new ArrayList<Cell>(); - while (true) { - boolean existMore = scanner.next(results); - if (!results.isEmpty()) - scannedCount++; - if (!existMore) - break; - results.clear(); - } - return scannedCount; + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); + AbstractTestWALReplay.setUpBeforeClass(); } - /** - * Create an HRegion with the result of a WAL split and test we only see the - * good edits - * @throws Exception - */ - @Test - public void testReplayEditsWrittenIntoWAL() throws Exception { - final TableName tableName = - TableName.valueOf("testReplayEditsWrittenIntoWAL"); - final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); - deleteDir(basedir); - - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - final WAL wal = createWAL(this.conf); - final byte[] rowName = tableName.getName(); - final byte[] regionName = hri.getEncodedNameAsBytes(); - - // Add 1k to each family. - final int countPerFamily = 1000; - Set<byte[]> familyNames = new HashSet<byte[]>(); - NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - for (HColumnDescriptor hcd: htd.getFamilies()) { - addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal, htd, mvcc, scopes); - familyNames.add(hcd.getName()); - } - - // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName, familyNames); - wal.completeCacheFlush(regionName); - - // Add an edit to another family, should be skipped. - WALEdit edit = new WALEdit(); - long now = ee.currentTime(); - edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, - now, rowName)); - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); - - // Delete the c family to verify deletes make it over. - edit = new WALEdit(); - now = ee.currentTime(); - edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); - - // Sync. - wal.sync(); - // Make a new conf and a new fs for the splitter to run on so we can take - // over old wal. - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - ".replay.wal.secondtime"); - user.runAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - runWALSplit(newConf); - FileSystem newFS = FileSystem.get(newConf); - // 100k seems to make for about 4 flushes during HRegion#initialize. - newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); - // Make a new wal for new region. - WAL newWal = createWAL(newConf); - final AtomicInteger flushcount = new AtomicInteger(0); - try { - final HRegion region = - new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { - @Override - protected FlushResult internalFlushcache(final WAL wal, final long myseqid, - final Collection<Store> storesToFlush, MonitoredTask status, - boolean writeFlushWalMarker) - throws IOException { - LOG.info("InternalFlushCache Invoked"); - FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, - Mockito.mock(MonitoredTask.class), writeFlushWalMarker); - flushcount.incrementAndGet(); - return fs; - } - }; - // The seq id this region has opened up with - long seqid = region.initialize(); - - // The mvcc readpoint of from inserting data. - long writePoint = mvcc.getWritePoint(); - - // We flushed during init. - assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); - assertTrue((seqid - 1) == writePoint); - - Get get = new Get(rowName); - Result result = region.get(get); - // Make sure we only see the good edits - assertEquals(countPerFamily * (htd.getFamilies().size() - 1), - result.size()); - region.close(); - } finally { - newWal.close(); - } - return null; - } - }); - } - - @Test - // the following test is for HBASE-6065 - public void testSequentialEditLogSeqNum() throws IOException { - final TableName tableName = TableName.valueOf(currentTest.getMethodName()); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = - FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - - // Mock the WAL - MockWAL wal = createMockWAL(); - - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - for (HColumnDescriptor hcd : htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - } - - // Let us flush the region - // But this time completeflushcache is not yet done - region.flush(true); - for (HColumnDescriptor hcd : htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); - } - long lastestSeqNumber = region.getReadPoint(null); - // get the current seq no - wal.doCompleteCacheFlush = true; - // allow complete cache flush with the previous seq number got after first - // set of edits. - wal.completeCacheFlush(hri.getEncodedNameAsBytes()); - wal.shutdown(); - FileStatus[] listStatus = wal.getFiles(); - assertNotNull(listStatus); - assertTrue(listStatus.length > 0); - WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, mode, wals); - FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), - "recovered.edits")), new PathFilter() { - @Override - public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { - return false; - } - return true; - } - }); - int editCount = 0; - for (FileStatus fileStatus : listStatus1) { - editCount = Integer.parseInt(fileStatus.getPath().getName()); - } - // The sequence number should be same - assertEquals( - "The sequence number of the recoverd.edits and the current edit seq should be same", - lastestSeqNumber, editCount); - } - - /** - * testcase for https://issues.apache.org/jira/browse/HBASE-15252 - */ - @Test - public void testDatalossWhenInputError() throws IOException, InstantiationException, - IllegalAccessException { - final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - Path regionDir = region1.getRegionFileSystem().getRegionDir(); - HBaseTestingUtility.closeRegionAndWAL(region1); - - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - for (HColumnDescriptor hcd : htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - } - // Now assert edits made it in. - final Get g = new Get(rowName); - Result result = region.get(g); - assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); - // Now close the region (without flush), split the log, reopen the region and assert that - // replay of log has the correct effect. - region.close(true); - wal.shutdown(); - - runWALSplit(this.conf); - - // here we let the DFSInputStream throw an IOException just after the WALHeader. - Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first(); - FSDataInputStream stream = fs.open(editFile); - stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); - Class<? extends DefaultWALProvider.Reader> logReaderClass = - conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - DefaultWALProvider.Reader.class); - DefaultWALProvider.Reader reader = logReaderClass.newInstance(); - reader.init(this.fs, editFile, conf, stream); - final long headerLength = stream.getPos(); - reader.close(); - FileSystem spyFs = spy(this.fs); - doAnswer(new Answer<FSDataInputStream>() { - - @Override - public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { - FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); - Field field = FilterInputStream.class.getDeclaredField("in"); - field.setAccessible(true); - final DFSInputStream in = (DFSInputStream) field.get(stream); - DFSInputStream spyIn = spy(in); - doAnswer(new Answer<Integer>() { - - private long pos; - - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - if (pos >= headerLength) { - throw new IOException("read over limit"); - } - int b = (Integer) invocation.callRealMethod(); - if (b > 0) { - pos += b; - } - return b; - } - }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class)); - doAnswer(new Answer<Void>() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - invocation.callRealMethod(); - in.close(); - return null; - } - }).when(spyIn).close(); - field.set(stream, spyIn); - return stream; - } - }).when(spyFs).open(eq(editFile)); - - WAL wal2 = createWAL(this.conf); - HRegion region2; - try { - // log replay should fail due to the IOException, otherwise we may lose data. - region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); - assertEquals(result.size(), region2.get(g).size()); - } catch (IOException e) { - assertEquals("read over limit", e.getMessage()); - } - region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); - assertEquals(result.size(), region2.get(g).size()); - } - - /** - * testcase for https://issues.apache.org/jira/browse/HBASE-14949. - */ - private void testNameConflictWhenSplit(boolean largeFirst) throws IOException { - final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); - final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); - deleteDir(basedir); - - final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); - for (byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region); - final byte[] family = htd.getColumnFamilies()[0].getName(); - final byte[] rowName = tableName.getName(); - FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); - FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); - - Path largeFile = new Path(logDir, "wal-1"); - Path smallFile = new Path(logDir, "wal-2"); - writerWALFile(largeFile, Arrays.asList(entry1, entry2)); - writerWALFile(smallFile, Arrays.asList(entry2)); - FileStatus first, second; - if (largeFirst) { - first = fs.getFileStatus(largeFile); - second = fs.getFileStatus(smallFile); - } else { - first = fs.getFileStatus(smallFile); - second = fs.getFileStatus(largeFile); - } - WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, - RecoveryMode.LOG_SPLITTING, wals); - WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, - RecoveryMode.LOG_SPLITTING, wals); - WAL wal = createWAL(this.conf); - region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); - assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); - assertEquals(2, region.get(new Get(rowName)).size()); - } - - @Test - public void testNameConflictWhenSplit0() throws IOException { - testNameConflictWhenSplit(true); - } - - @Test - public void testNameConflictWhenSplit1() throws IOException { - testNameConflictWhenSplit(false); - } - - static class MockWAL extends FSHLog { - boolean doCompleteCacheFlush = false; - - public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) - throws IOException { - super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); - } - - @Override - public void completeCacheFlush(byte[] encodedRegionName) { - if (!doCompleteCacheFlush) { - return; - } - super.completeCacheFlush(encodedRegionName); - } - } - - private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) { - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); - htd.addFamily(a); - return htd; - } - - private MockWAL createMockWAL() throws IOException { - MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); - // Set down maximum recovery so we dfsclient doesn't linger retrying something - // long gone. - HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); - return wal; - } - - // Flusher used in this test. Keep count of how often we are called and - // actually run the flush inside here. - class TestFlusher implements FlushRequester { - private HRegion r; - - @Override - public void requestFlush(Region region, boolean force) { - try { - r.flush(force); - } catch (IOException e) { - throw new RuntimeException("Exception flushing", e); - } - } - - @Override - public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { - // TODO Auto-generated method stub - - } - - @Override - public void registerFlushRequestListener(FlushRequestListener listener) { - - } - - @Override - public boolean unregisterFlushRequestListener(FlushRequestListener listener) { - return false; - } - - @Override - public void setGlobalMemstoreLimit(long globalMemStoreSize) { - - } - } - - private WALKey createWALKey(final TableName tableName, final HRegionInfo hri, - final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { - return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); - } - - private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, - int index) { - byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); - byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); - return edit; - } - - private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, - byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, - int index, NavigableMap<byte[], Integer> scopes) throws IOException { - FSWALEntry entry = - new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( - rowName, family, ee, index), hri, true); - entry.stampRegionSequenceId(); - return entry; - } - - private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, - final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, - final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, - NavigableMap<byte[], Integer> scopes) throws IOException { - for (int j = 0; j < count; j++) { - wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), - createWALEdit(rowName, family, ee, j), true); - } - wal.sync(); - } - - static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, - EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { - List<Put> puts = new ArrayList<Put>(); - for (int j = 0; j < count; j++) { - byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); - Put p = new Put(rowName); - p.addColumn(family, qualifier, ee.currentTime(), rowName); - r.put(p); - puts.add(p); - } - return puts; - } - - /* - * Creates an HRI around an HTD that has <code>tableName</code> and three - * column families named 'a','b', and 'c'. - * @param tableName Name of table to use when we create HTableDescriptor. - */ - private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { - return new HRegionInfo(tableName, null, null, false); - } - - /* - * Run the split. Verify only single split file made. - * @param c - * @return The single split file made - * @throws IOException - */ - private Path runWALSplit(final Configuration c) throws IOException { - List<Path> splits = WALSplitter.split( - hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); - // Split should generate only 1 file since there's only 1 region - assertEquals("splits=" + splits, 1, splits.size()); - // Make sure the file exists - assertTrue(fs.exists(splits.get(0))); - LOG.info("Split file=" + splits.get(0)); - return splits.get(0); - } - - /* - * @param c - * @return WAL with retries set down from 5 to 1 only. - * @throws IOException - */ - private WAL createWAL(final Configuration c) throws IOException { + @Override + protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); return wal; } - - private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) { - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); - htd.addFamily(a); - HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); - htd.addFamily(b); - HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); - htd.addFamily(c); - return htd; - } - - private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException { - fs.mkdirs(file.getParent()); - ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true); - for (FSWALEntry entry : entries) { - writer.append(entry); - } - writer.sync(); - writer.close(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java index b225554..6237c8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java @@ -27,13 +27,13 @@ import org.junit.experimental.categories.Category; /** * Enables compression and runs the TestWALReplay tests. */ -@Category({RegionServerTests.class, MediumTests.class}) +@Category({ RegionServerTests.class, MediumTests.class }) public class TestWALReplayCompressed extends TestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - TestWALReplay.setUpBeforeClass(); - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TestWALReplay.setUpBeforeClass(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 7664bfa..0a732a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -18,36 +18,34 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; + import java.io.IOException; import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WAL.Entry; - -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.DEFAULT_PROVIDER_ID; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; - - +import org.apache.hadoop.hbase.classification.InterfaceAudience; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL.Entry; /** * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our * normal interactions with HDFS. * * This implementation picks a directory in HDFS based on the same mechanisms as the - * {@link DefaultWALProvider}. Users can configure how much interaction + * {@link FSHLogProvider}. Users can configure how much interaction * we have with HDFS with the configuration property "hbase.wal.iotestprovider.operations". * The value should be a comma separated list of allowed operations: * <ul> @@ -102,9 +100,9 @@ public class IOTestProvider implements WALProvider { } final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf), - DefaultWALProvider.getWALDirectoryName(factory.factoryId), - HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, - true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), + HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } @Override @@ -150,7 +148,7 @@ public class IOTestProvider implements WALProvider { * it will be URL encoded before being used. * If prefix is null, "wal" will be used * @param suffix will be url encoded. null is treated as empty. non-empty must start with - * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER} + * {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} * @throws IOException */ public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir, http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java deleted file mode 100644 index 9b6ac54..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ /dev/null @@ -1,389 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.HashSet; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -// imports for things that haven't moved from regionserver.wal yet. -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; - -@Category({RegionServerTests.class, MediumTests.class}) -public class TestDefaultWALProvider { - private static final Log LOG = LogFactory.getLog(TestDefaultWALProvider.class); - - protected static Configuration conf; - protected static FileSystem fs; - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - protected MultiVersionConcurrencyControl mvcc; - - @Rule - public final TestName currentTest = new TestName(); - - @Before - public void setUp() throws Exception { - mvcc = new MultiVersionConcurrencyControl(); - FileStatus[] entries = fs.listStatus(new Path("/")); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } - } - - @After - public void tearDown() throws Exception { - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Make block sizes small. - TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); - // quicker heartbeat interval for faster DN death notification - TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); - TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); - TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); - - // faster failover with cluster.shutdown();fs.close() idiom - TEST_UTIL.getConfiguration() - .setInt("hbase.ipc.client.connect.max.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "dfs.client.block.recovery.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "hbase.ipc.client.connection.maxidletime", 500); - TEST_UTIL.startMiniDFSCluster(3); - - // Set up a working space for our tests. - TEST_UTIL.createRootDir(); - conf = TEST_UTIL.getConfiguration(); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - static String getName() { - return "TestDefaultWALProvider"; - } - - @Test - public void testGetServerNameFromWALDirectoryName() throws IOException { - ServerName sn = ServerName.valueOf("hn", 450, 1398); - String hl = FSUtils.getRootDir(conf) + "/" + - DefaultWALProvider.getWALDirectoryName(sn.toString()); - - // Must not throw exception - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null)); - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString())); - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "")); - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, " ")); - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl)); - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf")); - assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf")); - - final String wals = "/WALs/"; - ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString() + wals + sn + - "/localhost%2C32984%2C1343316388997.1343316390417"); - assertEquals("standard", sn, parsed); - - parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf"); - assertEquals("subdir", sn, parsed); - - parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString() + wals + sn + - "-splitting/localhost%3A57020.1340474893931"); - assertEquals("split", sn, parsed); - } - - - protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, - int times, NavigableMap<byte[], Integer> scopes) throws IOException { - final byte[] row = Bytes.toBytes("row"); - for (int i = 0; i < times; i++) { - long timestamp = System.currentTimeMillis(); - WALEdit cols = new WALEdit(); - cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), - cols, true); - } - log.sync(); - } - - /** - * used by TestDefaultWALProviderWithHLogKey - * @param scopes - */ - WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp, - NavigableMap<byte[], Integer> scopes) { - return new WALKey(info, tableName, timestamp, mvcc, scopes); - } - - /** - * helper method to simulate region flush for a WAL. - * @param wal - * @param regionEncodedName - */ - protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { - wal.startCacheFlush(regionEncodedName, flushedFamilyNames); - wal.completeCacheFlush(regionEncodedName); - } - - private static final byte[] UNSPECIFIED_REGION = new byte[]{}; - - @Test - public void testLogCleaning() throws Exception { - LOG.info("testLogCleaning"); - final HTableDescriptor htd = - new HTableDescriptor(TableName.valueOf("testLogCleaning")).addFamily(new HColumnDescriptor( - "row")); - final HTableDescriptor htd2 = - new HTableDescriptor(TableName.valueOf("testLogCleaning2")) - .addFamily(new HColumnDescriptor("row")); - NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes1.put(fam, 0); - } - NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd2.getFamiliesKeys()) { - scopes2.put(fam, 0); - } - final Configuration localConf = new Configuration(conf); - localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); - final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); - final AtomicLong sequenceId = new AtomicLong(1); - try { - HRegionInfo hri = new HRegionInfo(htd.getTableName(), - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(), - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - // we want to mix edits from regions, so pick our own identifier. - final WAL log = wals.getWAL(UNSPECIFIED_REGION, null); - - // Add a single edit and make sure that rolling won't remove the file - // Before HBASE-3198 it used to delete it - addEdits(log, hri, htd, 1, scopes1); - log.rollWriter(); - assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log)); - - // See if there's anything wrong with more than 1 edit - addEdits(log, hri, htd, 2, scopes1); - log.rollWriter(); - assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); - - // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, htd, 1, scopes1); - addEdits(log, hri2, htd2, 1, scopes2); - addEdits(log, hri, htd, 1, scopes1); - addEdits(log, hri2, htd2, 1, scopes2); - log.rollWriter(); - assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log)); - - // Flush the first region, we expect to see the first two files getting - // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, htd2, 1, scopes2); - log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); - log.completeCacheFlush(hri.getEncodedNameAsBytes()); - log.rollWriter(); - assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); - - // Flush the second region, which removes all the remaining output files - // since the oldest was completely flushed and the two others only contain - // flush information - addEdits(log, hri2, htd2, 1, scopes2); - log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys()); - log.completeCacheFlush(hri2.getEncodedNameAsBytes()); - log.rollWriter(); - assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log)); - } finally { - if (wals != null) { - wals.close(); - } - } - } - - /** - * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs - * and also don't archive "live logs" (that is, a log with un-flushed entries). - * <p> - * This is what it does: - * It creates two regions, and does a series of inserts along with log rolling. - * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for - * archiving if for all the regions which have entries in that wal file, have flushed - past - * their maximum sequence id in that wal file. - * <p> - * @throws IOException - */ - @Test - public void testWALArchiving() throws IOException { - LOG.debug("testWALArchiving"); - HTableDescriptor table1 = - new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); - HTableDescriptor table2 = - new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row")); - NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : table1.getFamiliesKeys()) { - scopes1.put(fam, 0); - } - NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : table2.getFamiliesKeys()) { - scopes2.put(fam, 0); - } - final Configuration localConf = new Configuration(conf); - localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); - final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); - try { - final WAL wal = wals.getWAL(UNSPECIFIED_REGION, null); - assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); - HRegionInfo hri1 = - new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = - new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); - // ensure that we don't split the regions. - hri1.setSplit(false); - hri2.setSplit(false); - // variables to mock region sequenceIds. - // start with the testing logic: insert a waledit, and roll writer - addEdits(wal, hri1, table1, 1, scopes1); - wal.rollWriter(); - // assert that the wal is rolled - assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal)); - // add edits in the second wal file, and roll writer. - addEdits(wal, hri1, table1, 1, scopes1); - wal.rollWriter(); - // assert that the wal is rolled - assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); - // add a waledit to table1, and flush the region. - addEdits(wal, hri1, table1, 3, scopes1); - flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys()); - // roll log; all old logs should be archived. - wal.rollWriter(); - assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); - // add an edit to table2, and roll writer - addEdits(wal, hri2, table2, 1, scopes2); - wal.rollWriter(); - assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal)); - // add edits for table1, and roll writer - addEdits(wal, hri1, table1, 2, scopes1); - wal.rollWriter(); - assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); - // add edits for table2, and flush hri1. - addEdits(wal, hri2, table2, 2, scopes2); - flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys()); - // the log : region-sequenceId map is - // log1: region2 (unflushed) - // log2: region1 (flushed) - // log3: region2 (unflushed) - // roll the writer; log2 should be archived. - wal.rollWriter(); - assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); - // flush region2, and all logs should be archived. - addEdits(wal, hri2, table2, 2, scopes2); - flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys()); - wal.rollWriter(); - assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); - } finally { - if (wals != null) { - wals.close(); - } - } - } - - /** - * Write to a log file with three concurrent threads and verifying all data is written. - * @throws Exception - */ - @Test - public void testConcurrentWrites() throws Exception { - // Run the WPE tool with three threads writing 3000 edits each concurrently. - // When done, verify that all edits were written. - int errCode = WALPerformanceEvaluation. - innerMain(new Configuration(TEST_UTIL.getConfiguration()), - new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); - assertEquals(0, errCode); - } - - /** - * Ensure that we can use Set.add to deduplicate WALs - */ - @Test - public void setMembershipDedups() throws IOException { - final Configuration localConf = new Configuration(conf); - localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); - final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); - try { - final Set<WAL> seen = new HashSet<WAL>(1); - final Random random = new Random(); - assertTrue("first attempt to add WAL from default provider should work.", - seen.add(wals.getWAL(Bytes.toBytes(random.nextInt()), null))); - for (int i = 0; i < 1000; i++) { - assertFalse("default wal provider is only supposed to return a single wal, which should " - + "compare as .equals itself.", - seen.add(wals.getWAL(Bytes.toBytes(random.nextInt()), null))); - } - } finally { - wals.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java index ef92768..15b419c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java @@ -28,7 +28,7 @@ import org.junit.experimental.categories.Category; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @Category({RegionServerTests.class, LargeTests.class}) -public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider { +public class TestDefaultWALProviderWithHLogKey extends TestFSHLogProvider { @Override WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp, final NavigableMap<byte[], Integer> scopes) {