http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index b5f5e32..4f36902 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -18,62 +18,220 @@
package org.apache.bookkeeper.client;
import static
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS;
+import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import junit.framework.TestCase;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.util.StaticDNSResolver;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.junit.After;
-import org.junit.Before;
+import org.jboss.netty.util.HashedWheelTimer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
-public class TestRackawareEnsemblePlacementPolicy {
+public class TestRackawareEnsemblePlacementPolicy extends TestCase {
static final Logger LOG =
LoggerFactory.getLogger(TestRackawareEnsemblePlacementPolicy.class);
RackawareEnsemblePlacementPolicy repp;
- Configuration conf = new CompositeConfiguration();
+ final ArrayList<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
+ final List<Integer> writeSet = new ArrayList<Integer>();
+ ClientConfiguration conf = new ClientConfiguration();
+ BookieSocketAddress addr1, addr2, addr3, addr4;
+ HashedWheelTimer timer;
- @Before
- public void setUp() throws Exception {
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
StaticDNSResolver.reset();
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
NetworkTopology.DEFAULT_RACK);
StaticDNSResolver.addNodeToRack("127.0.0.1",
NetworkTopology.DEFAULT_RACK);
StaticDNSResolver.addNodeToRack("localhost",
NetworkTopology.DEFAULT_RACK);
LOG.info("Set up static DNS Resolver.");
conf.setProperty(REPP_DNS_RESOLVER_CLASS,
StaticDNSResolver.class.getName());
+ addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ addr4 = new BookieSocketAddress("127.0.0.5", 3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_REGION + "/rack1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
NetworkTopology.DEFAULT_REGION + "/rack2");
+ ensemble.add(addr1);
+ ensemble.add(addr2);
+ ensemble.add(addr3);
+ ensemble.add(addr4);
+ for (int i = 0; i < 4; i++) {
+ writeSet.add(i);
+ }
+
+ timer = new HashedWheelTimer(
+ new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+ conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+ conf.getTimeoutTimerNumTicks());
+
repp = new RackawareEnsemblePlacementPolicy();
- repp.initialize(conf);
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer,
DISABLE_ALL, null);
}
- @After
- public void tearDown() throws Exception {
+ @Override
+ protected void tearDown() throws Exception {
repp.uninitalize();
+ super.tearDown();
+ }
+
+ static void updateMyRack(String rack) throws Exception {
+
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
rack);
+
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostName(), rack);
+ StaticDNSResolver.addNodeToRack("127.0.0.1", rack);
+ StaticDNSResolver.addNodeToRack("localhost", rack);
+ }
+
+ @Test(timeout = 60000)
+ public void testNodeDown() throws Exception {
+ repp.uninitalize();
+ updateMyRack(NetworkTopology.DEFAULT_RACK);
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer,
DISABLE_ALL, null);
+
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ addrs.remove(addr1);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+ List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
+ List<Integer> expectedSet = new ArrayList<Integer>();
+ expectedSet.add(1);
+ expectedSet.add(2);
+ expectedSet.add(3);
+ expectedSet.add(0);
+ LOG.info("reorder set : {}", reoderSet);
+ assertFalse(reoderSet == writeSet);
+ assertEquals(expectedSet, reoderSet);
+ }
+
+ @Test(timeout = 60000)
+ public void testNodeReadOnly() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer,
DISABLE_ALL, null);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ addrs.remove(addr1);
+ Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
+ ro.add(addr1);
+ repp.onClusterChanged(addrs, ro);
+
+ List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
+ List<Integer> expectedSet = new ArrayList<Integer>();
+ expectedSet.add(1);
+ expectedSet.add(2);
+ expectedSet.add(3);
+ expectedSet.add(0);
+ LOG.info("reorder set : {}", reoderSet);
+ assertFalse(reoderSet == writeSet);
+ assertEquals(expectedSet, reoderSet);
+ }
+
+ @Test(timeout = 60000)
+ public void testTwoNodesDown() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer,
DISABLE_ALL, null);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ addrs.remove(addr1);
+ addrs.remove(addr2);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+ List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
+ List<Integer> expectedSet = new ArrayList<Integer>();
+ expectedSet.add(2);
+ expectedSet.add(3);
+ expectedSet.add(0);
+ expectedSet.add(1);
+ LOG.info("reorder set : {}", reoderSet);
+ assertFalse(reoderSet == writeSet);
+ assertEquals(expectedSet, reoderSet);
+ }
+
+ @Test(timeout = 60000)
+ public void testNodeDownAndReadOnly() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer,
DISABLE_ALL, null);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ addrs.remove(addr1);
+ addrs.remove(addr2);
+ Set<BookieSocketAddress> roAddrs = new HashSet<BookieSocketAddress>();
+ roAddrs.add(addr2);
+ repp.onClusterChanged(addrs, roAddrs);
+ List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
+ List<Integer> expectedSet = new ArrayList<Integer>();
+ expectedSet.add(2);
+ expectedSet.add(3);
+ expectedSet.add(1);
+ expectedSet.add(0);
+ assertFalse(reoderSet == writeSet);
+ assertEquals(expectedSet, reoderSet);
}
@Test(timeout = 60000)
public void testReplaceBookieWithEnoughBookiesInSameRack() throws
Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
- BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
- BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
- BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
-
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
"/r3");
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -82,22 +240,21 @@ public class TestRackawareEnsemblePlacementPolicy {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(addr2, new
HashSet<BookieSocketAddress>());
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new
HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
assertEquals(addr3, replacedBookie);
}
@Test(timeout = 60000)
public void testReplaceBookieWithEnoughBookiesInDifferentRack() throws
Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
- BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
- BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
- BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
-
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
"/r3");
-
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
"/r4");
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r3");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -108,7 +265,7 @@ public class TestRackawareEnsemblePlacementPolicy {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(addr2,
excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -116,16 +273,15 @@ public class TestRackawareEnsemblePlacementPolicy {
@Test(timeout = 60000)
public void testReplaceBookieWithNotEnoughBookies() throws Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
- BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
- BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
- BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
-
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
"/r3");
-
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
"/r4");
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r3");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -139,7 +295,7 @@ public class TestRackawareEnsemblePlacementPolicy {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(),
addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -148,10 +304,10 @@ public class TestRackawareEnsemblePlacementPolicy {
@Test(timeout = 60000)
public void testNewEnsembleWithSingleRack() throws Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
- BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
- BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
- BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.7", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.8", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.9", 3181);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -160,9 +316,9 @@ public class TestRackawareEnsemblePlacementPolicy {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2,
new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2,
2, new HashSet<BookieSocketAddress>());
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
2, new HashSet<BookieSocketAddress>());
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -176,11 +332,10 @@ public class TestRackawareEnsemblePlacementPolicy {
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
-
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
"/r2");
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r2");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -189,10 +344,10 @@ public class TestRackawareEnsemblePlacementPolicy {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2,
new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2,
2, new HashSet<BookieSocketAddress>());
int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
2, new HashSet<BookieSocketAddress>());
numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -200,27 +355,25 @@ public class TestRackawareEnsemblePlacementPolicy {
}
}
- @Test(timeout = 90000)
+ @Test(timeout = 60000)
public void testNewEnsembleWithEnoughRacks() throws Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
- BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
- BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
- BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
- BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
- BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
- BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
- BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
+ BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
+ BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.9", 3181);
// update dns mapping
-
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
"/r3");
-
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
"/r4");
-
StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr7.getSocketAddress().getAddress().getHostAddress(),
"/r3");
-
StaticDNSResolver.addNodeToRack(addr8.getSocketAddress().getAddress().getHostAddress(),
"/r4");
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r3");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r4");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr6.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr7.getHostName(),
"/default-region/r3");
+ StaticDNSResolver.addNodeToRack(addr8.getHostName(),
"/default-region/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -233,9 +386,9 @@ public class TestRackawareEnsemblePlacementPolicy {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2,
new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2,
2, new HashSet<BookieSocketAddress>());
assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
2, new HashSet<BookieSocketAddress>());
assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -247,16 +400,15 @@ public class TestRackawareEnsemblePlacementPolicy {
*/
@Test(timeout = 60000)
public void testRemoveBookieFromCluster() {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
- BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
- BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
- BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
-
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
- NetworkTopology.DEFAULT_RACK);
-
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
"/r2");
-
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
"/r3");
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -277,10 +429,72 @@ public class TestRackawareEnsemblePlacementPolicy {
for (int j = 0; j < writeQuorumSize; j++) {
int bookieIdx = (i + j) % ensembleSize;
BookieSocketAddress addr = ensemble.get(bookieIdx);
-
racks.add(StaticDNSResolver.getRack(addr.getSocketAddress().getAddress().getHostAddress()));
+ racks.add(StaticDNSResolver.getRack(addr.getHostName()));
}
numCoveredWriteQuorums += (racks.size() > 1 ? 1 : 0);
}
return numCoveredWriteQuorums;
}
+
+ @Test(timeout = 60000)
+ public void testNodeWithFailures() throws Exception {
+ repp.uninitalize();
+ updateMyRack(NetworkTopology.DEFAULT_RACK);
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer,
DISABLE_ALL, null);
+
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+ HashMap<BookieSocketAddress, Long> bookieFailures = new
HashMap<BookieSocketAddress, Long>();
+
+ bookieFailures.put(addr1, 20L);
+ bookieFailures.put(addr2, 22L);
+
+ List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
bookieFailures);
+ LOG.info("reorder set : {}", reoderSet);
+ assertEquals(ensemble.get(reoderSet.get(2)), addr1);
+ assertEquals(ensemble.get(reoderSet.get(3)), addr2);
+ assertEquals(ensemble.get(reoderSet.get(0)), addr3);
+ assertEquals(ensemble.get(reoderSet.get(1)), addr4);
+ }
+
+ @Test(timeout = 60000)
+ public void testPlacementOnStabilizeNetworkTopology() throws Exception {
+ repp.uninitalize();
+ updateMyRack(NetworkTopology.DEFAULT_RACK);
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ ClientConfiguration confLocal = new ClientConfiguration();
+ confLocal.addConfiguration(conf);
+ confLocal.setNetworkTopologyStabilizePeriodSeconds(99999);
+ repp.initialize(confLocal, Optional.<DNSToSwitchMapping>absent(),
timer, DISABLE_ALL, null);
+
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ // addr4 left
+ addrs.remove(addr4);
+ Set<BookieSocketAddress> deadBookies = repp.onClusterChanged(addrs,
new HashSet<BookieSocketAddress>());
+ assertTrue(deadBookies.isEmpty());
+
+ // we will never use addr4 even it is in the stabilized network
topology
+ for (int i = 0 ; i < 5; i++) {
+ ArrayList<BookieSocketAddress> ensemble =
+ repp.newEnsemble(3, 3, 3, new
HashSet<BookieSocketAddress>());
+ assertFalse(ensemble.contains(addr4));
+ }
+
+ // we could still use addr4 for urgent allocation if it is just bookie
flapping
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4,
new HashSet<BookieSocketAddress>());
+ assertTrue(ensemble.contains(addr4));
+ }
}