http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java deleted file mode 100644 index b1fc3d7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import static org.junit.Assert.*; - -import java.net.URI; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.KeeperException; - -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.util.LocalBookKeeper; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.List; - -import java.io.IOException; -import java.io.File; - -/** - * Utility class for setting up bookkeeper ensembles - * and bringing individual bookies up and down - */ -class BKJMUtil { - protected static final Log LOG = LogFactory.getLog(BKJMUtil.class); - - int nextPort = 6000; // next port for additionally created bookies - private Thread bkthread = null; - private final static String zkEnsemble = "127.0.0.1:2181"; - int numBookies; - - BKJMUtil(final int numBookies) throws Exception { - this.numBookies = numBookies; - - bkthread = new Thread() { - public void run() { - try { - String[] args = new String[1]; - args[0] = String.valueOf(numBookies); - LOG.info("Starting bk"); - LocalBookKeeper.main(args); - } catch (InterruptedException e) { - // go away quietly - } catch (Exception e) { - LOG.error("Error starting local bk", e); - } - } - }; - } - - void start() throws Exception { - bkthread.start(); - if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) { - throw new Exception("Error starting zookeeper/bookkeeper"); - } - assertEquals("Not all bookies started", - numBookies, checkBookiesUp(numBookies, 10)); - } - - void teardown() throws Exception { - if (bkthread != null) { - bkthread.interrupt(); - bkthread.join(); - } - } - - static ZooKeeper connectZooKeeper() - throws IOException, KeeperException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - - ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() { - public void process(WatchedEvent event) { - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { - latch.countDown(); - } - } - }); - if (!latch.await(3, TimeUnit.SECONDS)) { - throw new IOException("Zookeeper took too long to connect"); - } - return zkc; - } - - static URI createJournalURI(String path) throws Exception { - return URI.create("bookkeeper://" + zkEnsemble + path); - } - - static void addJournalManagerDefinition(Configuration conf) { - conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper", - "org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager"); - } - - BookieServer newBookie() throws Exception { - int port = nextPort++; - ServerConfiguration bookieConf = new ServerConfiguration(); - bookieConf.setBookiePort(port); - File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_", - "test"); - tmpdir.delete(); - tmpdir.mkdir(); - - bookieConf.setZkServers(zkEnsemble); - bookieConf.setJournalDirName(tmpdir.getPath()); - bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() }); - - BookieServer b = new BookieServer(bookieConf); - b.start(); - for (int i = 0; i < 10 && !b.isRunning(); i++) { - Thread.sleep(10000); - } - if (!b.isRunning()) { - throw new IOException("Bookie would not start"); - } - return b; - } - - /** - * Check that a number of bookies are available - * @param count number of bookies required - * @param timeout number of seconds to wait for bookies to start - * @throws IOException if bookies are not started by the time the timeout hits - */ - int checkBookiesUp(int count, int timeout) throws Exception { - ZooKeeper zkc = connectZooKeeper(); - try { - int mostRecentSize = 0; - for (int i = 0; i < timeout; i++) { - try { - List<String> children = zkc.getChildren("/ledgers/available", - false); - mostRecentSize = children.size(); - // Skip 'readonly znode' which is used for keeping R-O bookie details - if (children.contains("readonly")) { - mostRecentSize = children.size() - 1; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + mostRecentSize + " bookies up, " - + "waiting for " + count); - if (LOG.isTraceEnabled()) { - for (String child : children) { - LOG.trace(" server: " + child); - } - } - } - if (mostRecentSize == count) { - break; - } - } catch (KeeperException e) { - // ignore - } - Thread.sleep(1000); - } - return mostRecentSize; - } finally { - zkc.close(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java deleted file mode 100644 index ff8c00d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java +++ /dev/null @@ -1,414 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import static org.junit.Assert.*; - -import org.junit.Test; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.AfterClass; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; -import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; -import org.apache.hadoop.hdfs.DFSConfigKeys; - -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; - -import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; - -import org.apache.hadoop.ipc.RemoteException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; - -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.ExitUtil.ExitException; - -import org.apache.bookkeeper.proto.BookieServer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; - -/** - * Integration test to ensure that the BookKeeper JournalManager - * works for HDFS Namenode HA - */ -@RunWith(Parameterized.class) -public class TestBookKeeperAsHASharedDir { - static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class); - - private static BKJMUtil bkutil; - static int numBookies = 3; - - private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager"; - - @Parameters - public static Collection<Object[]> data() { - Collection<Object[]> params = new ArrayList<Object[]>(); - params.add(new Object[]{ Boolean.FALSE }); - params.add(new Object[]{ Boolean.TRUE }); - return params; - } - - private static boolean useAsyncEditLog; - public TestBookKeeperAsHASharedDir(Boolean async) { - useAsyncEditLog = async; - } - - private static Configuration getConf() { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, - useAsyncEditLog); - return conf; - } - - @BeforeClass - public static void setupBookkeeper() throws Exception { - bkutil = new BKJMUtil(numBookies); - bkutil.start(); - } - - @Before - public void clearExitStatus() { - ExitUtil.resetFirstExitException(); - } - - @AfterClass - public static void teardownBookkeeper() throws Exception { - bkutil.teardown(); - } - - /** - * Test simple HA failover usecase with BK - */ - @Test - public void testFailoverWithBK() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = getConf(); - conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, - BKJMUtil.createJournalURI("/hotfailover").toString()); - BKJMUtil.addJournalManagerDefinition(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(0) - .manageNameDfsSharedDirs(false) - .build(); - NameNode nn1 = cluster.getNameNode(0); - NameNode nn2 = cluster.getNameNode(1); - - cluster.waitActive(); - cluster.transitionToActive(0); - - Path p = new Path("/testBKJMfailover"); - - FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); - - fs.mkdirs(p); - cluster.shutdownNameNode(0); - - cluster.transitionToActive(1); - - assertTrue(fs.exists(p)); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test HA failover, where BK, as the shared storage, fails. - * Once it becomes available again, a standby can come up. - * Verify that any write happening after the BK fail is not - * available on the standby. - */ - @Test - public void testFailoverWithFailingBKCluster() throws Exception { - int ensembleSize = numBookies + 1; - BookieServer newBookie = bkutil.newBookie(); - assertEquals("New bookie didn't start", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - BookieServer replacementBookie = null; - - MiniDFSCluster cluster = null; - - try { - Configuration conf = getConf(); - conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, - BKJMUtil.createJournalURI("/hotfailoverWithFail").toString()); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - ensembleSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, - ensembleSize); - BKJMUtil.addJournalManagerDefinition(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(0) - .manageNameDfsSharedDirs(false) - .checkExitOnShutdown(false) - .build(); - NameNode nn1 = cluster.getNameNode(0); - NameNode nn2 = cluster.getNameNode(1); - - cluster.waitActive(); - cluster.transitionToActive(0); - - Path p1 = new Path("/testBKJMFailingBKCluster1"); - Path p2 = new Path("/testBKJMFailingBKCluster2"); - - FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); - - fs.mkdirs(p1); - newBookie.shutdown(); // will take down shared storage - assertEquals("New bookie didn't stop", - numBookies, bkutil.checkBookiesUp(numBookies, 10)); - - try { - fs.mkdirs(p2); - fail("mkdirs should result in the NN exiting"); - } catch (RemoteException re) { - assertTrue(re.getClassName().contains("ExitException")); - } - cluster.shutdownNameNode(0); - - try { - cluster.transitionToActive(1); - fail("Shouldn't have been able to transition with bookies down"); - } catch (ExitException ee) { - assertTrue("Should shutdown due to required journal failure", - ee.getMessage().contains( - "starting log segment 3 failed for required journal")); - } - - replacementBookie = bkutil.newBookie(); - assertEquals("Replacement bookie didn't start", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - cluster.transitionToActive(1); // should work fine now - - assertTrue(fs.exists(p1)); - assertFalse(fs.exists(p2)); - } finally { - newBookie.shutdown(); - if (replacementBookie != null) { - replacementBookie.shutdown(); - } - - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test that two namenodes can't continue as primary - */ - @Test - public void testMultiplePrimariesStarted() throws Exception { - Path p1 = new Path("/testBKJMMultiplePrimary"); - - MiniDFSCluster cluster = null; - try { - Configuration conf = getConf(); - conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, - BKJMUtil.createJournalURI("/hotfailoverMultiple").toString()); - BKJMUtil.addJournalManagerDefinition(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(0) - .manageNameDfsSharedDirs(false) - .checkExitOnShutdown(false) - .build(); - NameNode nn1 = cluster.getNameNode(0); - NameNode nn2 = cluster.getNameNode(1); - cluster.waitActive(); - cluster.transitionToActive(0); - - FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); - fs.mkdirs(p1); - nn1.getRpcServer().rollEditLog(); - cluster.transitionToActive(1); - fs = cluster.getFileSystem(0); // get the older active server. - - try { - System.out.println("DMS: > *************"); - boolean foo = fs.delete(p1, true); - System.out.println("DMS: < ************* "+foo); - fail("Log update on older active should cause it to exit"); - } catch (RemoteException re) { - assertTrue(re.getClassName().contains("ExitException")); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy - * the edits log segments to new bkjm shared edits. - * - * @throws Exception - */ - @Test - public void testInitializeBKSharedEdits() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = getConf(); - HAUtil.setAllowStandbyReads(conf, true); - - MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology(); - cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology) - .numDataNodes(0).build(); - cluster.waitActive(); - // Shutdown and clear the current filebased shared dir. - cluster.shutdownNameNodes(); - File shareddir = new File(cluster.getSharedEditsDir(0, 1)); - assertTrue("Initial Shared edits dir not fully deleted", - FileUtil.fullyDelete(shareddir)); - - // Check namenodes should not start without shared dir. - assertCanNotStartNamenode(cluster, 0); - assertCanNotStartNamenode(cluster, 1); - - // Configure bkjm as new shared edits dir in both namenodes - Configuration nn1Conf = cluster.getConfiguration(0); - Configuration nn2Conf = cluster.getConfiguration(1); - nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil - .createJournalURI("/initializeSharedEdits").toString()); - nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil - .createJournalURI("/initializeSharedEdits").toString()); - BKJMUtil.addJournalManagerDefinition(nn1Conf); - BKJMUtil.addJournalManagerDefinition(nn2Conf); - - // Initialize the BKJM shared edits. - assertFalse(NameNode.initializeSharedEdits(nn1Conf)); - - // NameNode should be able to start and should be in sync with BKJM as - // shared dir - assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize"); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) { - try { - cluster.restartNameNode(nnIndex, false); - fail("Should not have been able to start NN" + (nnIndex) - + " without shared dir"); - } catch (IOException ioe) { - LOG.info("Got expected exception", ioe); - GenericTestUtils.assertExceptionContains( - "storage directory does not exist or is not accessible", ioe); - } - } - - private void assertCanStartHANameNodes(MiniDFSCluster cluster, - Configuration conf, String path) throws ServiceFailedException, - IOException, URISyntaxException, InterruptedException { - // Now should be able to start both NNs. Pass "false" here so that we don't - // try to waitActive on all NNs, since the second NN doesn't exist yet. - cluster.restartNameNode(0, false); - cluster.restartNameNode(1, true); - - // Make sure HA is working. - cluster - .getNameNode(0) - .getRpcServer() - .transitionToActive( - new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER)); - FileSystem fs = null; - try { - Path newPath = new Path(path); - fs = HATestUtil.configureFailoverFs(cluster, conf); - assertTrue(fs.mkdirs(newPath)); - HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), - cluster.getNameNode(1)); - assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), - newPath.toString(), false).isDir()); - } finally { - if (fs != null) { - fs.close(); - } - } - } - - /** - * NameNode should load the edits correctly if the applicable edits are - * present in the BKJM. - */ - @Test - public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = getConf(); - conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil - .createJournalURI("/correctEditLogSelection").toString()); - BKJMUtil.addJournalManagerDefinition(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0) - .manageNameDfsSharedDirs(false).build(); - NameNode nn1 = cluster.getNameNode(0); - NameNode nn2 = cluster.getNameNode(1); - cluster.waitActive(); - cluster.transitionToActive(0); - nn1.getRpcServer().rollEditLog(); // Roll Edits from current Active. - // Transition to standby current active gracefully. - cluster.transitionToStandby(0); - // Make the other Active and Roll edits multiple times - cluster.transitionToActive(1); - nn2.getRpcServer().rollEditLog(); - nn2.getRpcServer().rollEditLog(); - // Now One more failover. So NN1 should be able to failover successfully. - cluster.transitionToStandby(1); - cluster.transitionToActive(0); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java deleted file mode 100644 index f3f6ce5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.Random; - -import org.apache.bookkeeper.util.LocalBookKeeper; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; - -public class TestBookKeeperConfiguration { - private static final Log LOG = LogFactory - .getLog(TestBookKeeperConfiguration.class); - private static final int ZK_SESSION_TIMEOUT = 5000; - private static final String HOSTPORT = "127.0.0.1:2181"; - private static final int CONNECTION_TIMEOUT = 30000; - private static NIOServerCnxnFactory serverFactory; - private static ZooKeeperServer zks; - private static ZooKeeper zkc; - private static int ZooKeeperDefaultPort = 2181; - private static File ZkTmpDir; - private BookKeeperJournalManager bkjm; - private static final String BK_ROOT_PATH = "/ledgers"; - - private static ZooKeeper connectZooKeeper(String ensemble) - throws IOException, KeeperException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - - ZooKeeper zkc = new ZooKeeper(HOSTPORT, ZK_SESSION_TIMEOUT, new Watcher() { - public void process(WatchedEvent event) { - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { - latch.countDown(); - } - } - }); - if (!latch.await(ZK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS)) { - throw new IOException("Zookeeper took too long to connect"); - } - return zkc; - } - - private NamespaceInfo newNSInfo() { - Random r = new Random(); - return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1); - } - - @BeforeClass - public static void setupZooKeeper() throws Exception { - // create a ZooKeeper server(dataDir, dataLogDir, port) - LOG.info("Starting ZK server"); - ZkTmpDir = File.createTempFile("zookeeper", "test"); - ZkTmpDir.delete(); - ZkTmpDir.mkdir(); - - try { - zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort); - serverFactory = new NIOServerCnxnFactory(); - serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10); - serverFactory.startup(zks); - } catch (Exception e) { - LOG.error("Exception while instantiating ZooKeeper", e); - } - - boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT); - LOG.debug("ZooKeeper server up: " + b); - } - - @Before - public void setup() throws Exception { - zkc = connectZooKeeper(HOSTPORT); - try { - ZKUtil.deleteRecursive(zkc, BK_ROOT_PATH); - } catch (KeeperException.NoNodeException e) { - LOG.debug("Ignoring no node exception on cleanup", e); - } catch (Exception e) { - LOG.error("Exception when deleting bookie root path in zk", e); - } - } - - @After - public void teardown() throws Exception { - if (null != zkc) { - zkc.close(); - } - if (null != bkjm) { - bkjm.close(); - } - } - - @AfterClass - public static void teardownZooKeeper() throws Exception { - if (null != zkc) { - zkc.close(); - } - } - - /** - * Verify the BKJM is creating the bookie available path configured in - * 'dfs.namenode.bookkeeperjournal.zk.availablebookies' - */ - @Test - public void testWithConfiguringBKAvailablePath() throws Exception { - // set Bookie available path in the configuration - String bkAvailablePath - = BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT; - Configuration conf = new Configuration(); - conf.setStrings(BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH, - bkAvailablePath); - Assert.assertNull(bkAvailablePath + " already exists", zkc.exists( - bkAvailablePath, false)); - NamespaceInfo nsi = newNSInfo(); - bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"), - nsi); - bkjm.format(nsi); - Assert.assertNotNull("Bookie available path : " + bkAvailablePath - + " doesn't exists", zkc.exists(bkAvailablePath, false)); - } - - /** - * Verify the BKJM is creating the bookie available default path, when there - * is no 'dfs.namenode.bookkeeperjournal.zk.availablebookies' configured - */ - @Test - public void testDefaultBKAvailablePath() throws Exception { - Configuration conf = new Configuration(); - Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists( - BK_ROOT_PATH, false)); - NamespaceInfo nsi = newNSInfo(); - bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"), - nsi); - bkjm.format(nsi); - Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH - + " doesn't exists", zkc.exists(BK_ROOT_PATH, false)); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java deleted file mode 100644 index 52e4568..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; - -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.zookeeper.ZooKeeper; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Unit test for the bkjm's streams - */ -public class TestBookKeeperEditLogStreams { - static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class); - - private static BKJMUtil bkutil; - private final static int numBookies = 3; - - @BeforeClass - public static void setupBookkeeper() throws Exception { - bkutil = new BKJMUtil(numBookies); - bkutil.start(); - } - - @AfterClass - public static void teardownBookkeeper() throws Exception { - bkutil.teardown(); - } - - /** - * Test that bkjm will refuse open a stream on an empty - * ledger. - */ - @Test - public void testEmptyInputStream() throws Exception { - ZooKeeper zk = BKJMUtil.connectZooKeeper(); - - BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk); - try { - LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar" - .getBytes()); - lh.close(); - - EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar", - HdfsServerConstants.NAMENODE_LAYOUT_VERSION, lh.getId(), 0x1234); - try { - new BookKeeperEditLogInputStream(lh, metadata, -1); - fail("Shouldn't get this far, should have thrown"); - } catch (IOException ioe) { - assertTrue(ioe.getMessage().contains("Invalid first bk entry to read")); - } - - metadata = new EditLogLedgerMetadata("/foobar", - HdfsServerConstants.NAMENODE_LAYOUT_VERSION, lh.getId(), 0x1234); - try { - new BookKeeperEditLogInputStream(lh, metadata, 0); - fail("Shouldn't get this far, should have thrown"); - } catch (IOException ioe) { - assertTrue(ioe.getMessage().contains("Invalid first bk entry to read")); - } - } finally { - bkc.close(); - zk.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java deleted file mode 100644 index b8fc30d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.net.BindException; -import java.util.Random; - -/** - * Runs the same tests as TestStandbyCheckpoints, but - * using a bookkeeper journal manager as the shared directory - */ -public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints { - //overwrite the nn count - static{ - TestStandbyCheckpoints.NUM_NNS = 2; - } - private static BKJMUtil bkutil = null; - static int numBookies = 3; - static int journalCount = 0; - private final Random random = new Random(); - - private static final Log LOG = LogFactory.getLog(TestStandbyCheckpoints.class); - - @SuppressWarnings("rawtypes") - @Override - @Before - public void setupCluster() throws Exception { - Configuration conf = setupCommonConfig(); - conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, - BKJMUtil.createJournalURI("/checkpointing" + journalCount++) - .toString()); - BKJMUtil.addJournalManagerDefinition(conf); - - int retryCount = 0; - while (true) { - try { - int basePort = 10060 + random.nextInt(100) * 2; - MiniDFSNNTopology topology = new MiniDFSNNTopology() - .addNameservice(new MiniDFSNNTopology.NSConf("ns1") - .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort)) - .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1))); - - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(topology) - .numDataNodes(1) - .manageNameDfsSharedDirs(false) - .build(); - cluster.waitActive(); - - setNNs(); - fs = HATestUtil.configureFailoverFs(cluster, conf); - - cluster.transitionToActive(0); - ++retryCount; - break; - } catch (BindException e) { - LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry " - + retryCount + " times"); - } - } - } - - @BeforeClass - public static void startBK() throws Exception { - journalCount = 0; - bkutil = new BKJMUtil(numBookies); - bkutil.start(); - } - - @AfterClass - public static void shutdownBK() throws Exception { - if (bkutil != null) { - bkutil.teardown(); - } - } - - @Override - public void testCheckpointCancellation() throws Exception { - // Overriden as the implementation in the superclass assumes that writes - // are to a file. This should be fixed at some point - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java deleted file mode 100644 index 07fcd72..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ /dev/null @@ -1,984 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.spy; -import org.junit.Test; -import org.junit.Before; -import org.junit.After; -import org.junit.BeforeClass; -import org.junit.AfterClass; -import org.mockito.Mockito; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Callable; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; -import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; -import org.apache.hadoop.hdfs.server.namenode.JournalManager; -import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; - -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -public class TestBookKeeperJournalManager { - static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class); - - private static final long DEFAULT_SEGMENT_SIZE = 1000; - - protected static Configuration conf = new Configuration(); - private ZooKeeper zkc; - private static BKJMUtil bkutil; - static int numBookies = 3; - private BookieServer newBookie; - - @BeforeClass - public static void setupBookkeeper() throws Exception { - bkutil = new BKJMUtil(numBookies); - bkutil.start(); - } - - @AfterClass - public static void teardownBookkeeper() throws Exception { - bkutil.teardown(); - } - - @Before - public void setup() throws Exception { - zkc = BKJMUtil.connectZooKeeper(); - } - - @After - public void teardown() throws Exception { - zkc.close(); - if (newBookie != null) { - newBookie.shutdown(); - } - } - - private NamespaceInfo newNSInfo() { - Random r = new Random(); - return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1); - } - - @Test - public void testSimpleWrite() throws Exception { - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = 1 ; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, 100); - - String zkpath = bkjm.finalizedLedgerZNode(1, 100); - - assertNotNull(zkc.exists(zkpath, false)); - assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); - } - - @Test - public void testNumberOfTransactions() throws Exception { - NamespaceInfo nsi = newNSInfo(); - - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = 1 ; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, 100); - - long numTrans = bkjm.getNumberOfTransactions(1, true); - assertEquals(100, numTrans); - } - - @Test - public void testNumberOfTransactionsWithGaps() throws Exception { - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi); - bkjm.format(nsi); - - long txid = 1; - for (long i = 0; i < 3; i++) { - long start = txid; - EditLogOutputStream out = bkjm.startLogSegment(start, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(start, txid-1); - assertNotNull( - zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); - } - zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, - DEFAULT_SEGMENT_SIZE*2), -1); - - long numTrans = bkjm.getNumberOfTransactions(1, true); - assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); - - try { - numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true); - fail("Should have thrown corruption exception by this point"); - } catch (JournalManager.CorruptionException ce) { - // if we get here, everything is going good - } - - numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true); - assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); - } - - @Test - public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi); - bkjm.format(nsi); - - long txid = 1; - for (long i = 0; i < 3; i++) { - long start = txid; - EditLogOutputStream out = bkjm.startLogSegment(start, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - - out.close(); - bkjm.finalizeLogSegment(start, (txid-1)); - assertNotNull( - zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); - } - long start = txid; - EditLogOutputStream out = bkjm.startLogSegment(start, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.setReadyToFlush(); - out.flush(); - out.abort(); - out.close(); - - long numTrans = bkjm.getNumberOfTransactions(1, true); - assertEquals((txid-1), numTrans); - } - - /** - * Create a bkjm namespace, write a journal from txid 1, close stream. - * Try to create a new journal from txid 1. Should throw an exception. - */ - @Test - public void testWriteRestartFrom1() throws Exception { - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi); - bkjm.format(nsi); - - long txid = 1; - long start = txid; - EditLogOutputStream out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(start, (txid-1)); - - txid = 1; - try { - out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - fail("Shouldn't be able to start another journal from " + txid - + " when one already exists"); - } catch (Exception ioe) { - LOG.info("Caught exception as expected", ioe); - } - - // test border case - txid = DEFAULT_SEGMENT_SIZE; - try { - out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - fail("Shouldn't be able to start another journal from " + txid - + " when one already exists"); - } catch (IOException ioe) { - LOG.info("Caught exception as expected", ioe); - } - - // open journal continuing from before - txid = DEFAULT_SEGMENT_SIZE + 1; - start = txid; - out = bkjm.startLogSegment(start, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - assertNotNull(out); - - for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(start, (txid-1)); - - // open journal arbitarily far in the future - txid = DEFAULT_SEGMENT_SIZE * 4; - out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - assertNotNull(out); - } - - @Test - public void testTwoWriters() throws Exception { - long start = 1; - NamespaceInfo nsi = newNSInfo(); - - BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); - bkjm1.format(nsi); - - BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); - - - EditLogOutputStream out1 = bkjm1.startLogSegment(start, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - try { - bkjm2.startLogSegment(start, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - fail("Shouldn't have been able to open the second writer"); - } catch (IOException ioe) { - LOG.info("Caught exception as expected", ioe); - }finally{ - out1.close(); - } - } - - @Test - public void testSimpleRead() throws Exception { - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-simpleread"), - nsi); - bkjm.format(nsi); - - final long numTransactions = 10000; - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; - for (long i = 1 ; i <= numTransactions; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, numTransactions); - - List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); - bkjm.selectInputStreams(in, 1, true); - try { - assertEquals(numTransactions, - FSEditLogTestUtil.countTransactionsInStream(in.get(0))); - } finally { - in.get(0).close(); - } - } - - @Test - public void testSimpleRecovery() throws Exception { - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"), - nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; - for (long i = 1 ; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.setReadyToFlush(); - out.flush(); - - out.abort(); - out.close(); - - - assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); - assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false)); - - bkjm.recoverUnfinalizedSegments(); - - assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); - assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); - } - - /** - * Test that if enough bookies fail to prevent an ensemble, - * writes the bookkeeper will fail. Test that when once again - * an ensemble is available, it can continue to write. - */ - @Test - public void testAllBookieFailure() throws Exception { - // bookie to fail - newBookie = bkutil.newBookie(); - BookieServer replacementBookie = null; - - try { - int ensembleSize = numBookies + 1; - assertEquals("New bookie didn't start", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - // ensure that the journal manager has to use all bookies, - // so that a failure will fail the journal manager - Configuration conf = new Configuration(); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - ensembleSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, - ensembleSize); - long txid = 1; - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"), - nsi); - bkjm.format(nsi); - EditLogOutputStream out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - - for (long i = 1 ; i <= 3; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.setReadyToFlush(); - out.flush(); - newBookie.shutdown(); - assertEquals("New bookie didn't die", - numBookies, bkutil.checkBookiesUp(numBookies, 10)); - - try { - for (long i = 1 ; i <= 3; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.setReadyToFlush(); - out.flush(); - fail("should not get to this stage"); - } catch (IOException ioe) { - LOG.debug("Error writing to bookkeeper", ioe); - assertTrue("Invalid exception message", - ioe.getMessage().contains("Failed to write to bookkeeper")); - } - replacementBookie = bkutil.newBookie(); - - assertEquals("New bookie didn't start", - numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10)); - bkjm.recoverUnfinalizedSegments(); - out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = 1 ; i <= 3; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - - out.setReadyToFlush(); - out.flush(); - - } catch (Exception e) { - LOG.error("Exception in test", e); - throw e; - } finally { - if (replacementBookie != null) { - replacementBookie.shutdown(); - } - newBookie.shutdown(); - - if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { - LOG.warn("Not all bookies from this test shut down, expect errors"); - } - } - } - - /** - * Test that a BookKeeper JM can continue to work across the - * failure of a bookie. This should be handled transparently - * by bookkeeper. - */ - @Test - public void testOneBookieFailure() throws Exception { - newBookie = bkutil.newBookie(); - BookieServer replacementBookie = null; - - try { - int ensembleSize = numBookies + 1; - assertEquals("New bookie didn't start", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - // ensure that the journal manager has to use all bookies, - // so that a failure will fail the journal manager - Configuration conf = new Configuration(); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - ensembleSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, - ensembleSize); - long txid = 1; - - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), - nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(txid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = 1 ; i <= 3; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.setReadyToFlush(); - out.flush(); - - replacementBookie = bkutil.newBookie(); - assertEquals("replacement bookie didn't start", - ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10)); - newBookie.shutdown(); - assertEquals("New bookie didn't die", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - for (long i = 1 ; i <= 3; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(txid++); - out.write(op); - } - out.setReadyToFlush(); - out.flush(); - } catch (Exception e) { - LOG.error("Exception in test", e); - throw e; - } finally { - if (replacementBookie != null) { - replacementBookie.shutdown(); - } - newBookie.shutdown(); - - if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { - LOG.warn("Not all bookies from this test shut down, expect errors"); - } - } - } - - /** - * If a journal manager has an empty inprogress node, ensure that we throw an - * error, as this should not be possible, and some third party has corrupted - * the zookeeper state - */ - @Test - public void testEmptyInprogressNode() throws Exception { - URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress"); - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, - nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; - for (long i = 1; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, 100); - - out = bkjm.startLogSegment(101, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - out.close(); - bkjm.close(); - String inprogressZNode = bkjm.inprogressZNode(101); - zkc.setData(inprogressZNode, new byte[0], -1); - - bkjm = new BookKeeperJournalManager(conf, uri, nsi); - try { - bkjm.recoverUnfinalizedSegments(); - fail("Should have failed. There should be no way of creating" - + " an empty inprogess znode"); - } catch (IOException e) { - // correct behaviour - assertTrue("Exception different than expected", e.getMessage().contains( - "Invalid/Incomplete data in znode")); - } finally { - bkjm.close(); - } - } - - /** - * If a journal manager has an corrupt inprogress node, ensure that we throw - * an error, as this should not be possible, and some third party has - * corrupted the zookeeper state - */ - @Test - public void testCorruptInprogressNode() throws Exception { - URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress"); - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, - nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; - for (long i = 1; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, 100); - - out = bkjm.startLogSegment(101, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - out.close(); - bkjm.close(); - - String inprogressZNode = bkjm.inprogressZNode(101); - zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1); - - bkjm = new BookKeeperJournalManager(conf, uri, nsi); - try { - bkjm.recoverUnfinalizedSegments(); - fail("Should have failed. There should be no way of creating" - + " an empty inprogess znode"); - } catch (IOException e) { - // correct behaviour - assertTrue("Exception different than expected", e.getMessage().contains( - "has no field named")); - } finally { - bkjm.close(); - } - } - - /** - * Cases can occur where we create a segment but crash before we even have the - * chance to write the START_SEGMENT op. If this occurs we should warn, but - * load as normal - */ - @Test - public void testEmptyInprogressLedger() throws Exception { - URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger"); - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, - nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; - for (long i = 1; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, 100); - - out = bkjm.startLogSegment(101, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - out.close(); - bkjm.close(); - - bkjm = new BookKeeperJournalManager(conf, uri, nsi); - bkjm.recoverUnfinalizedSegments(); - out = bkjm.startLogSegment(101, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = 1; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(101, 200); - - bkjm.close(); - } - - /** - * Test that if we fail between finalizing an inprogress and deleting the - * corresponding inprogress znode. - */ - @Test - public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { - URI uri = BKJMUtil - .createJournalURI("/hdfsjournal-refinalizeInprogressLedger"); - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, - nsi); - bkjm.format(nsi); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; - for (long i = 1; i <= 100; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.close(); - - String inprogressZNode = bkjm.inprogressZNode(1); - String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100); - assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode, - null)); - assertNull("finalized znode exists", zkc.exists(finalizedZNode, null)); - - byte[] inprogressData = zkc.getData(inprogressZNode, false, null); - - // finalize - bkjm = new BookKeeperJournalManager(conf, uri, nsi); - bkjm.recoverUnfinalizedSegments(); - bkjm.close(); - - assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null)); - assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode, - null)); - - zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - // should work fine - bkjm = new BookKeeperJournalManager(conf, uri, nsi); - bkjm.recoverUnfinalizedSegments(); - bkjm.close(); - } - - /** - * Tests that the edit log file meta data reading from ZooKeeper should be - * able to handle the NoNodeException. bkjm.getInputStream(fromTxId, - * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441. - */ - @Test - public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception { - URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile"); - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, - nsi); - bkjm.format(nsi); - - try { - // start new inprogress log segment with txid=1 - // and write transactions till txid=50 - String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50); - - // start new inprogress log segment with txid=51 - // and write transactions till txid=100 - String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100); - - // read the metadata from ZK. Here simulating the situation - // when reading,the edit log metadata can be removed by purger thread. - ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper()); - bkjm.setZooKeeper(zkspy); - Mockito.doThrow( - new KeeperException.NoNodeException(zkpath2 + " doesn't exists")) - .when(zkspy).getData(zkpath2, false, null); - - List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false); - assertEquals("List contains the metadata of non exists path.", 1, - ledgerList.size()); - assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1, - ledgerList.get(0).getZkPath()); - } finally { - bkjm.close(); - } - } - - private enum ThreadStatus { - COMPLETED, GOODEXCEPTION, BADEXCEPTION; - }; - - /** - * Tests that concurrent calls to format will still allow one to succeed. - */ - @Test - public void testConcurrentFormat() throws Exception { - final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat"); - final NamespaceInfo nsi = newNSInfo(); - - // populate with data first - BookKeeperJournalManager bkjm - = new BookKeeperJournalManager(conf, uri, nsi); - bkjm.format(nsi); - for (int i = 1; i < 100*2; i += 2) { - bkjm.startLogSegment(i, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - bkjm.finalizeLogSegment(i, i+1); - } - bkjm.close(); - - final int numThreads = 40; - List<Callable<ThreadStatus>> threads - = new ArrayList<Callable<ThreadStatus>>(); - final CyclicBarrier barrier = new CyclicBarrier(numThreads); - - for (int i = 0; i < numThreads; i++) { - threads.add(new Callable<ThreadStatus>() { - public ThreadStatus call() { - BookKeeperJournalManager bkjm = null; - try { - bkjm = new BookKeeperJournalManager(conf, uri, nsi); - barrier.await(); - bkjm.format(nsi); - return ThreadStatus.COMPLETED; - } catch (IOException ioe) { - LOG.info("Exception formatting ", ioe); - return ThreadStatus.GOODEXCEPTION; - } catch (InterruptedException ie) { - LOG.error("Interrupted. Something is broken", ie); - Thread.currentThread().interrupt(); - return ThreadStatus.BADEXCEPTION; - } catch (Exception e) { - LOG.error("Some other bad exception", e); - return ThreadStatus.BADEXCEPTION; - } finally { - if (bkjm != null) { - try { - bkjm.close(); - } catch (IOException ioe) { - LOG.error("Error closing journal manager", ioe); - } - } - } - } - }); - } - ExecutorService service = Executors.newFixedThreadPool(numThreads); - List<Future<ThreadStatus>> statuses = service.invokeAll(threads, 60, - TimeUnit.SECONDS); - int numCompleted = 0; - for (Future<ThreadStatus> s : statuses) { - assertTrue(s.isDone()); - assertTrue("Thread threw invalid exception", - s.get() == ThreadStatus.COMPLETED - || s.get() == ThreadStatus.GOODEXCEPTION); - if (s.get() == ThreadStatus.COMPLETED) { - numCompleted++; - } - } - LOG.info("Completed " + numCompleted + " formats"); - assertTrue("No thread managed to complete formatting", numCompleted > 0); - } - - @Test(timeout = 120000) - public void testDefaultAckQuorum() throws Exception { - newBookie = bkutil.newBookie(); - int ensembleSize = numBookies + 1; - int quorumSize = numBookies + 1; - // ensure that the journal manager has to use all bookies, - // so that a failure will fail the journal manager - Configuration conf = new Configuration(); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - ensembleSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, - quorumSize); - // sets 2 secs - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, - 2); - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi); - bkjm.format(nsi); - CountDownLatch sleepLatch = new CountDownLatch(1); - sleepBookie(sleepLatch, newBookie); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - int numTransactions = 100; - for (long i = 1; i <= numTransactions; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - try { - out.close(); - bkjm.finalizeLogSegment(1, numTransactions); - - List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); - bkjm.selectInputStreams(in, 1, true); - try { - assertEquals(numTransactions, - FSEditLogTestUtil.countTransactionsInStream(in.get(0))); - } finally { - in.get(0).close(); - } - fail("Should throw exception as not enough non-faulty bookies available!"); - } catch (IOException ioe) { - // expected - } - } - - /** - * Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie - * alive and sleep all the other bookies. Now the client would wait for the - * acknowledgement from the ack size bookies and after receiving the success - * response will continue writing. Non ack client will hang long time to add - * entries. - */ - @Test(timeout = 120000) - public void testAckQuorum() throws Exception { - // slow bookie - newBookie = bkutil.newBookie(); - // make quorum size and ensemble size same to avoid the interleave writing - // of the ledger entries - int ensembleSize = numBookies + 1; - int quorumSize = numBookies + 1; - int ackSize = numBookies; - // ensure that the journal manager has to use all bookies, - // so that a failure will fail the journal manager - Configuration conf = new Configuration(); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - ensembleSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, - quorumSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, - ackSize); - // sets 60 minutes - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, - 3600); - - NamespaceInfo nsi = newNSInfo(); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi); - bkjm.format(nsi); - CountDownLatch sleepLatch = new CountDownLatch(1); - sleepBookie(sleepLatch, newBookie); - - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - int numTransactions = 100; - for (long i = 1; i <= numTransactions; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, numTransactions); - - List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); - bkjm.selectInputStreams(in, 1, true); - try { - assertEquals(numTransactions, - FSEditLogTestUtil.countTransactionsInStream(in.get(0))); - } finally { - sleepLatch.countDown(); - in.get(0).close(); - bkjm.close(); - } - } - - /** - * Sleep a bookie until I count down the latch - * - * @param latch - * Latch to wait on - * @param bookie - * bookie server - * @throws Exception - */ - private void sleepBookie(final CountDownLatch l, final BookieServer bookie) - throws Exception { - - Thread sleeper = new Thread() { - public void run() { - try { - bookie.suspendProcessing(); - l.await(60, TimeUnit.SECONDS); - bookie.resumeProcessing(); - } catch (Exception e) { - LOG.error("Error suspending bookie", e); - } - } - }; - sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId()); - sleeper.start(); - } - - - private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, - int startTxid, int endTxid) throws IOException, KeeperException, - InterruptedException { - EditLogOutputStream out = bkjm.startLogSegment(startTxid, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = startTxid; i <= endTxid; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - // finalize the inprogress_1 log segment. - bkjm.finalizeLogSegment(startTxid, endTxid); - String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); - assertNotNull(zkc.exists(zkpath1, false)); - assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); - return zkpath1; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java deleted file mode 100644 index f5b86bc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; -import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; -import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.zookeeper.ZooKeeper; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestBookKeeperSpeculativeRead { - private static final Log LOG = LogFactory - .getLog(TestBookKeeperSpeculativeRead.class); - - private ZooKeeper zkc; - private static BKJMUtil bkutil; - private static int numLocalBookies = 1; - private static List<BookieServer> bks = new ArrayList<BookieServer>(); - - @BeforeClass - public static void setupBookkeeper() throws Exception { - bkutil = new BKJMUtil(1); - bkutil.start(); - } - - @AfterClass - public static void teardownBookkeeper() throws Exception { - bkutil.teardown(); - for (BookieServer bk : bks) { - bk.shutdown(); - } - } - - @Before - public void setup() throws Exception { - zkc = BKJMUtil.connectZooKeeper(); - } - - @After - public void teardown() throws Exception { - zkc.close(); - } - - private NamespaceInfo newNSInfo() { - Random r = new Random(); - return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1); - } - - /** - * Test speculative read feature supported by bookkeeper. Keep one bookie - * alive and sleep all the other bookies. Non spec client will hang for long - * time to read the entries from the bookkeeper. - */ - @Test(timeout = 120000) - public void testSpeculativeRead() throws Exception { - // starting 9 more servers - for (int i = 1; i < 10; i++) { - bks.add(bkutil.newBookie()); - } - NamespaceInfo nsi = newNSInfo(); - Configuration conf = new Configuration(); - int ensembleSize = numLocalBookies + 9; - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - ensembleSize); - conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, - ensembleSize); - conf.setInt( - BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, - 100); - // sets 60 minute - conf.setInt( - BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi); - bkjm.format(nsi); - - final long numTransactions = 1000; - EditLogOutputStream out = bkjm.startLogSegment(1, - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); - for (long i = 1; i <= numTransactions; i++) { - FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); - op.setTransactionId(i); - out.write(op); - } - out.close(); - bkjm.finalizeLogSegment(1, numTransactions); - - List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); - bkjm.selectInputStreams(in, 1, true); - - // sleep 9 bk servers. Now only one server is running and responding to the - // clients - CountDownLatch sleepLatch = new CountDownLatch(1); - for (final BookieServer bookie : bks) { - sleepBookie(sleepLatch, bookie); - } - try { - assertEquals(numTransactions, - FSEditLogTestUtil.countTransactionsInStream(in.get(0))); - } finally { - in.get(0).close(); - sleepLatch.countDown(); - bkjm.close(); - } - } - - /** - * Sleep a bookie until I count down the latch - * - * @param latch - * latch to wait on - * @param bookie - * bookie server - * @throws Exception - */ - private void sleepBookie(final CountDownLatch latch, final BookieServer bookie) - throws Exception { - - Thread sleeper = new Thread() { - public void run() { - try { - bookie.suspendProcessing(); - latch.await(2, TimeUnit.MINUTES); - bookie.resumeProcessing(); - } catch (Exception e) { - LOG.error("Error suspending bookie", e); - } - } - }; - sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId()); - sleeper.start(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java deleted file mode 100644 index ef7f708..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.io.File; -import java.io.FileFilter; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby; -import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints.SlowCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.ImmutableList; - -public class TestBootstrapStandbyWithBKJM { - private static BKJMUtil bkutil; - protected MiniDFSCluster cluster; - - @BeforeClass - public static void setupBookkeeper() throws Exception { - bkutil = new BKJMUtil(3); - bkutil.start(); - } - - @AfterClass - public static void teardownBookkeeper() throws Exception { - bkutil.teardown(); - } - - @After - public void teardown() { - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); - conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil - .createJournalURI("/bootstrapStandby").toString()); - BKJMUtil.addJournalManagerDefinition(conf); - conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); - conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, - SlowCodec.class.getCanonicalName()); - CompressionCodecFactory.setCodecClasses(conf, - ImmutableList.<Class> of(SlowCodec.class)); - MiniDFSNNTopology topology = new MiniDFSNNTopology() - .addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN( - new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN( - new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002))); - cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology) - .numDataNodes(1).manageNameDfsSharedDirs(false).build(); - cluster.waitActive(); - } - - /** - * While boostrapping, in_progress transaction entries should be skipped. - * Bootstrap usage for BKJM : "-force", "-nonInteractive", "-skipSharedEditsCheck" - */ - @Test - public void testBootstrapStandbyWithActiveNN() throws Exception { - // make nn0 active - cluster.transitionToActive(0); - - // do ops and generate in-progress edit log data - Configuration confNN1 = cluster.getConfiguration(1); - DistributedFileSystem dfs = (DistributedFileSystem) HATestUtil - .configureFailoverFs(cluster, confNN1); - for (int i = 1; i <= 10; i++) { - dfs.mkdirs(new Path("/test" + i)); - } - dfs.close(); - - // shutdown nn1 and delete its edit log files - cluster.shutdownNameNode(1); - deleteEditLogIfExists(confNN1); - cluster.getNameNodeRpc(0).setSafeMode(SafeModeAction.SAFEMODE_ENTER, true); - cluster.getNameNodeRpc(0).saveNamespace(0, 0); - cluster.getNameNodeRpc(0).setSafeMode(SafeModeAction.SAFEMODE_LEAVE, true); - - // check without -skipSharedEditsCheck, Bootstrap should fail for BKJM - // immediately after saveNamespace - int rc = BootstrapStandby.run(new String[] { "-force", "-nonInteractive" }, - confNN1); - Assert.assertEquals("Mismatches return code", 6, rc); - - // check with -skipSharedEditsCheck - rc = BootstrapStandby.run(new String[] { "-force", "-nonInteractive", - "-skipSharedEditsCheck" }, confNN1); - Assert.assertEquals("Mismatches return code", 0, rc); - - // Checkpoint as fast as we can, in a tight loop. - confNN1.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1); - cluster.restartNameNode(1); - cluster.transitionToStandby(1); - - NameNode nn0 = cluster.getNameNode(0); - HATestUtil.waitForStandbyToCatchUp(nn0, cluster.getNameNode(1)); - long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0) - .getFSImage().getMostRecentCheckpointTxId(); - HATestUtil.waitForCheckpoint(cluster, 1, - ImmutableList.of((int) expectedCheckpointTxId)); - - // Should have copied over the namespace - FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, - ImmutableList.of((int) expectedCheckpointTxId)); - FSImageTestUtil.assertNNFilesMatch(cluster); - } - - private void deleteEditLogIfExists(Configuration confNN1) { - String editDirs = confNN1.get(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY); - String[] listEditDirs = StringUtils.split(editDirs, ','); - Assert.assertTrue("Wrong edit directory path!", listEditDirs.length > 0); - - for (String dir : listEditDirs) { - File curDir = new File(dir, "current"); - File[] listFiles = curDir.listFiles(new FileFilter() { - @Override - public boolean accept(File f) { - if (!f.getName().startsWith("edits")) { - return true; - } - return false; - } - }); - if (listFiles != null && listFiles.length > 0) { - for (File file : listFiles) { - Assert.assertTrue("Failed to delete edit files!", file.delete()); - } - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org