http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java new file mode 100644 index 0000000..490bd7c --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY; +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START; +import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT; +import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE; +import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION; +import static org.apache.accumulo.core.security.Authorizations.EMPTY; +import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.master.state.SetGoalState; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class WALSunnyDayIT extends ConfigurableMacIT { + + private static final Text CF = new Text(new byte[0]); + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(GC_CYCLE_DELAY, "1s"); + cfg.setProperty(GC_CYCLE_START, "0s"); + cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M"); + cfg.setProperty(TSERV_WAL_REPLICATION, "1"); + cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s"); + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + int countTrue(Collection<Boolean> bools) { + int result = 0; + for (Boolean b : bools) { + if (b.booleanValue()) + result ++; + } + return result; + } + + @Test + public void test() throws Exception { + MiniAccumuloClusterImpl mac = getCluster(); + MiniAccumuloClusterControl control = mac.getClusterControl(); + control.stop(GARBAGE_COLLECTOR); + Connector c = getConnector(); + ZooKeeper zoo = new ZooKeeper(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut(), new Watcher() { + @Override + public void process(WatchedEvent event) { + log.info(event.toString()); + } + }); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + writeSomeData(c, tableName, 1, 1); + + // wal markers are added lazily + Map<String,Boolean> wals = getWals(c, zoo); + assertEquals(wals.toString(), 1, wals.size()); + for (Boolean b : wals.values()) { + assertTrue("logs should be in use", b.booleanValue()); + } + + // roll log, get a new next + writeSomeData(c, tableName, 1000, 50); + Map<String,Boolean> walsAfterRoll = getWals(c, zoo); + assertEquals("should have 3 WALs after roll", 2, walsAfterRoll.size()); + assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet())); + assertEquals("all WALs should be in use", 2, countTrue(walsAfterRoll.values())); + + // flush the tables + for (String table: new String[] { tableName, MetadataTable.NAME, RootTable.NAME} ) { + c.tableOperations().flush(table, null, null, true); + } + UtilWaitThread.sleep(1000); + // rolled WAL is no longer in use, but needs to be GC'd + Map<String,Boolean> walsAfterflush = getWals(c, zoo); + assertEquals(walsAfterflush.toString(), 2, walsAfterflush.size()); + assertEquals("inUse should be 1", 1, countTrue(walsAfterflush.values())); + + // let the GC run for a little bit + control.start(GARBAGE_COLLECTOR); + UtilWaitThread.sleep(5 * 1000); + // make sure the unused WAL goes away + Map<String,Boolean> walsAfterGC = getWals(c, zoo); + assertEquals(walsAfterGC.toString(), 1, walsAfterGC.size()); + control.stop(GARBAGE_COLLECTOR); + // restart the tserver, but don't run recovery on all tablets + control.stop(TABLET_SERVER); + // this delays recovery on the normal tables + assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor()); + control.start(TABLET_SERVER); + + // wait for the metadata table to go back online + getRecoveryMarkers(c); + // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets + UtilWaitThread.sleep(5 * 1000); + Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c); + //log.debug("markers " + markers); + assertEquals("one tablet should have markers", 1, markers.keySet().size()); + assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1")); + + // put some data in the WAL + assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor()); + verifySomeData(c, tableName, 1000 * 50 + 1); + writeSomeData(c, tableName, 100, 100); + + Map<String,Boolean> walsAfterRestart = getWals(c, zoo); + //log.debug("wals after " + walsAfterRestart); + assertEquals("used WALs after restart should be 1", 1, countTrue(walsAfterRestart.values())); + control.start(GARBAGE_COLLECTOR); + UtilWaitThread.sleep(5 * 1000); + Map<String,Boolean> walsAfterRestartAndGC = getWals(c, zoo); + assertEquals("wals left should be 1", 1, walsAfterRestartAndGC.size()); + assertEquals("logs in use should be 1", 1, countTrue(walsAfterRestartAndGC.values())); + } + + private void verifySomeData(Connector c, String tableName, int expected) throws Exception { + Scanner scan = c.createScanner(tableName, EMPTY); + int result = Iterators.size(scan.iterator()); + scan.close(); + Assert.assertEquals(expected, result); + } + + private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception { + Random rand = new Random(); + BatchWriter bw = conn.createBatchWriter(tableName, null); + byte[] rowData = new byte[10]; + byte[] cq = new byte[10]; + byte[] value = new byte[10]; + + for (int r = 0; r < row; r++) { + rand.nextBytes(rowData); + Mutation m = new Mutation(rowData); + for (int c = 0; c < col; c++) { + rand.nextBytes(cq); + rand.nextBytes(value); + m.put(CF, new Text(cq), new Value(value)); + } + bw.addMutation(m); + if (r % 100 == 0) { + bw.flush(); + } + } + bw.close(); + } + + private Map<String, Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception { + Map<String, Boolean> result = new HashMap<>(); + Scanner root = c.createScanner(RootTable.NAME, EMPTY); + root.setRange(CurrentLogsSection.getRange()); + Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY); + meta.setRange(root.getRange()); + Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator()); + while (both.hasNext()) { + Entry<Key,Value> entry = both.next(); + Text path = new Text(); + CurrentLogsSection.getPath(entry.getKey(), path); + result.put(path.toString(), entry.getValue().get().length == 0); + } + String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + List<String> children = zoo.getChildren(zpath, null); + for (String child : children) { + byte[] data = zoo.getData(zpath + "/" + child, null, null); + result.put(new String(data), true); + } + return result; + } + + private Map<KeyExtent, List<String>> getRecoveryMarkers(Connector c) throws Exception { + Map<KeyExtent, List<String>> result = new HashMap<>(); + Scanner root = c.createScanner(RootTable.NAME, EMPTY); + root.setRange(TabletsSection.getRange()); + root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + TabletColumnFamily.PREV_ROW_COLUMN.fetch(root); + + Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY); + meta.setRange(TabletsSection.getRange()); + meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta); + + List<String> logs = new ArrayList<>(); + Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator()); + while (both.hasNext()) { + Entry<Key,Value> entry = both.next(); + Key key = entry.getKey(); + if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) { + logs.add(key.getColumnQualifier().toString()); + } + if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) { + KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue()); + result.put(extent, logs); + logs = new ArrayList<String>(); + } + } + return result; + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java index 3b1dd2f..140fd59 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java @@ -52,7 +52,7 @@ public class WatchTheWatchCountIT extends ConfigurableMacIT { String response = new String(buffer, 0, n); long total = Long.parseLong(response.split(":")[1].trim()); assertTrue("Total watches was not greater than 500, but was " + total, total > 500); - assertTrue("Total watches was not less than 650, but was " + total, total < 600); + assertTrue("Total watches was not less than 675, but was " + total, total < 675); } finally { socket.close(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java new file mode 100644 index 0000000..fcd1fd7 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.performance; + +import static org.junit.Assert.assertTrue; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.test.continuous.ContinuousIngest; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class RollWALPerformanceIT extends ConfigurableMacIT { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M"); + cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100"); + cfg.setProperty(Property.GC_FILE_ARCHIVE, "false"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.useMiniDFS(true); + } + + private long ingest() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + + log.info("Creating the table"); + c.tableOperations().create(tableName); + + log.info("Splitting the table"); + final long SPLIT_COUNT = 100; + final long distance = Long.MAX_VALUE / SPLIT_COUNT; + final SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 1; i < SPLIT_COUNT; i++) { + splits.add(new Text(String.format("%016x", i * distance))); + } + c.tableOperations().addSplits(tableName, splits); + + log.info("Waiting for balance"); + c.instanceOperations().waitForBalance(); + + final Instance inst = c.getInstance(); + + log.info("Starting ingest"); + final long start = System.currentTimeMillis(); + final String args[] = { + "-i", inst.getInstanceName(), + "-z", inst.getZooKeepers(), + "-u", "root", + "-p", ROOT_PASSWORD, + "--batchThreads", "2", + "--table", tableName, + "--num", Long.toString(1000*1000), // 1M 100 byte entries + }; + + ContinuousIngest.main(args); + final long result = System.currentTimeMillis() - start; + log.debug(String.format("Finished in %,d ms", result)); + log.debug("Dropping table"); + c.tableOperations().delete(tableName); + return result; + } + + private long getAverage() throws Exception { + final int REPEAT = 3; + long totalTime = 0; + for (int i = 0; i < REPEAT; i++) { + totalTime += ingest(); + } + return totalTime / REPEAT; + } + + private void testWalPerformanceOnce() throws Exception { + // get time with a small WAL, which will cause many WAL roll-overs + long avg1 = getAverage(); + // use a bigger WAL max size to eliminate WAL roll-overs + Connector c = getConnector(); + c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G"); + c.tableOperations().flush(MetadataTable.NAME, null, null, true); + c.tableOperations().flush(RootTable.NAME, null, null, true); + for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { + getCluster().killProcess(ServerType.TABLET_SERVER, tserver); + } + getCluster().start(); + long avg2 = getAverage(); + log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2)); + assertTrue(avg1 > avg2); + double percent = (100. * avg1) / avg2; + log.info(String.format("Percent of large log: %.2f%%", percent)); + assertTrue(percent < 125.); + } + + @Test(timeout= 20 * 60 * 1000) + public void testWalPerformance() throws Exception { + testWalPerformanceOnce(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java index 75f61f1..62ed9c2 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -78,6 +79,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { cfg.setNumTservers(1); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); // Wait longer to try to let the replication table come online before a cycle runs cfg.setProperty(Property.GC_CYCLE_START, "10s"); @@ -102,18 +104,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - Range r = MetadataSchema.TabletsSection.getRange(tableId); - s.setRange(r); - s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + s.setRange(CurrentLogsSection.getRange()); + s.fetchColumnFamily(CurrentLogsSection.COLF); Set<String> wals = new HashSet<String>(); for (Entry<Key,Value> entry : s) { log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); // hostname:port/uri://path/to/wal - String cq = entry.getKey().getColumnQualifier().toString(); - int index = cq.indexOf('/'); - // Normalize the path - String path = new Path(cq.substring(index + 1)).toString(); + String path = new Path(entry.getKey().getColumnQualifier().toString()).toString(); log.debug("Extracted file: " + path); wals.add(path); } @@ -228,11 +226,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); - log.info("Checking to see that log entries are removed from tablet section after MinC"); - // After compaction, the log column should be gone from the tablet - Set<String> walsAfterMinc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); - Set<String> filesForTable = getFilesForTable(table); Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); log.info("Files for table before MajC: {}", filesForTable); @@ -258,14 +251,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI fileExists = fs.exists(fileToBeDeleted); } - // At this point in time, we *know* that the GarbageCollector has run which means that the Status - // for our WAL should not be altered. - - log.info("Re-checking that WALs are still not referenced for our table"); - - Set<String> walsAfterMajc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); - Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); @@ -326,11 +311,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); - log.info("Checking to see that log entries are removed from tablet section after MinC"); - // After compaction, the log column should be gone from the tablet - Set<String> walsAfterMinc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); - Set<String> filesForTable = getFilesForTable(table); Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); log.info("Files for table before MajC: {}", filesForTable); @@ -359,11 +339,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI // At this point in time, we *know* that the GarbageCollector has run which means that the Status // for our WAL should not be altered. - log.info("Re-checking that WALs are still not referenced for our table"); - - Set<String> walsAfterMajc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); - Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java index 9dec16e..7a017e1 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java @@ -146,7 +146,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { } } - @Test + @Test(timeout = 10 * 60 * 1000) public void dataWasReplicatedToThePeer() throws Exception { MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), ROOT_PASSWORD); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java index 46e3ac1..3c1cbeb 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java @@ -16,11 +16,12 @@ */ package org.apache.accumulo.test.replication; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; @@ -45,6 +46,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -63,6 +65,7 @@ import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; @@ -71,7 +74,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.gc.SimpleGarbageCollector; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.replication.ReplicaSystemFactory; import org.apache.accumulo.server.replication.StatusCombiner; import org.apache.accumulo.server.replication.StatusFormatter; @@ -79,7 +82,6 @@ import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,25 +125,38 @@ public class ReplicationIT extends ConfigurableMacIT { cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s"); cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s"); cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); cfg.setNumTservers(1); hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException { - Multimap<String,String> logs = HashMultimap.create(); + // Map of server to tableId + Multimap<TServerInstance, String> serverToTableID = HashMultimap.create(); Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - scanner.fetchColumnFamily(LogColumnFamily.NAME); - scanner.setRange(new Range()); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + for (Entry<Key,Value> entry : scanner) { + TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); + byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow()); + serverToTableID.put(key, new String(tableId, UTF_8)); + } + // Map of logs to tableId + Multimap<String,String> logs = HashMultimap.create(); + scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(MetadataSchema.CurrentLogsSection.getRange()); for (Entry<Key,Value> entry : scanner) { if (Thread.interrupted()) { return logs; } - - LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - - for (String log : logEntry.logSet) { - // Need to normalize the log file from LogEntry - logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString()); + Text path = new Text(); + MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); + Text session = new Text(); + Text hostPort = new Text(); + MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort , session); + TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString()); + for (String tableId : serverToTableID.get(server)) { + logs.put(new Path(path.toString()).toString(), tableId); } } return logs; @@ -296,10 +311,12 @@ public class ReplicationIT extends ConfigurableMacIT { attempts = 5; while (wals.isEmpty() && attempts > 0) { s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + s.setRange(MetadataSchema.CurrentLogsSection.getRange()); + s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF); for (Entry<Key,Value> entry : s) { - LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - wals.add(new Path(logEntry.filename).toString()); + Text path = new Text(); + MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); + wals.add(new Path(path.toString()).toString()); } attempts--; } @@ -330,18 +347,7 @@ public class ReplicationIT extends ConfigurableMacIT { Assert.assertFalse(ReplicationTable.isOnline(conn)); for (String table : tables) { - BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); - - for (int j = 0; j < 5; j++) { - Mutation m = new Mutation(Integer.toString(j)); - for (int k = 0; k < 5; k++) { - String value = Integer.toString(k); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table, 5, 5); } // After writing data, still no replication table @@ -381,18 +387,7 @@ public class ReplicationIT extends ConfigurableMacIT { Assert.assertFalse(ReplicationTable.isOnline(conn)); // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - - for (int rows = 0; rows < 50; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 50; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 50, 50); // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1 // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there @@ -439,18 +434,7 @@ public class ReplicationIT extends ConfigurableMacIT { conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); // Write some data to table2 - bw = conn.createBatchWriter(table2, new BatchWriterConfig()); - - for (int rows = 0; rows < 50; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 50; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table2, 50, 50); // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2 // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there @@ -498,6 +482,19 @@ public class ReplicationIT extends ConfigurableMacIT { Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext()); } + private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception { + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + for (int row = 0; row < rows; row++) { + Mutation m = new Mutation(Integer.toString(row)); + for (int col = 0; col < cols; col++) { + String value = Integer.toString(col); + m.put(value, "", value); + } + bw.addMutation(m); + } + bw.close(); + } + @Test public void replicationEntriesPrecludeWalDeletion() throws Exception { final Connector conn = getConnector(); @@ -529,53 +526,21 @@ public class ReplicationIT extends ConfigurableMacIT { Thread.sleep(2000); // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 200, 500); conn.tableOperations().create(table2); conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); Thread.sleep(2000); - // Write some data to table2 - bw = conn.createBatchWriter(table2, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table2, 200, 500); conn.tableOperations().create(table3); conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); Thread.sleep(2000); - // Write some data to table3 - bw = conn.createBatchWriter(table3, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table3, 200, 500); // Force a write to metadata for the data written for (String table : Arrays.asList(table1, table2, table3)) { @@ -609,7 +574,8 @@ public class ReplicationIT extends ConfigurableMacIT { // We should have *some* reference to each log that was seen in the metadata table // They might not yet all be closed though (might be newfile) - Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles); + Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles)); + Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1); for (String replFile : replFiles) { Path p = new Path(replFile); @@ -697,44 +663,11 @@ public class ReplicationIT extends ConfigurableMacIT { conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } + writeSomeData(conn, table1, 200, 500); - bw.close(); + writeSomeData(conn, table2, 200, 500); - // Write some data to table2 - bw = conn.createBatchWriter(table2, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - // Write some data to table3 - bw = conn.createBatchWriter(table3, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table3, 200, 500); // Flush everything to try to make the replication records for (String table : Arrays.asList(table1, table2, table3)) { @@ -789,10 +722,7 @@ public class ReplicationIT extends ConfigurableMacIT { Set<String> wals = new HashSet<>(); for (Entry<Key,Value> entry : s) { LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - for (String file : logEntry.logSet) { - Path p = new Path(file); - wals.add(p.toString()); - } + wals.add(new Path(logEntry.filename).toString()); } log.warn("Found wals {}", wals); @@ -869,9 +799,7 @@ public class ReplicationIT extends ConfigurableMacIT { public void singleTableWithSingleTarget() throws Exception { // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons // against expected Status messages. - for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) { - cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc); - } + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); Connector conn = getConnector(); String table1 = "table1"; @@ -905,17 +833,7 @@ public class ReplicationIT extends ConfigurableMacIT { } // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 2000; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 50; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 2000, 50); // Make sure the replication table is online at this point boolean online = ReplicationTable.isOnline(conn); @@ -1002,17 +920,7 @@ public class ReplicationIT extends ConfigurableMacIT { } // Write some more data so that we over-run the single WAL - bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 3000; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 50; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 3000, 50); log.info("Issued compaction for table"); conn.tableOperations().compact(table1, null, null, true, true); @@ -1085,17 +993,7 @@ public class ReplicationIT extends ConfigurableMacIT { } // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 2000; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 50; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 2000, 50); conn.tableOperations().flush(table1, null, null, true); String tableId = conn.tableOperations().tableIdMap().get(table1); @@ -1151,10 +1049,7 @@ public class ReplicationIT extends ConfigurableMacIT { @Test public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception { - Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR); - for (ProcessReference ref : gcProcs) { - cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref); - } + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); final Connector conn = getConnector(); @@ -1185,7 +1080,6 @@ public class ReplicationIT extends ConfigurableMacIT { String table1 = "table1", table2 = "table2", table3 = "table3"; - BatchWriter bw; try { conn.tableOperations().create(table1); conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); @@ -1194,51 +1088,19 @@ public class ReplicationIT extends ConfigurableMacIT { ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null)); // Write some data to table1 - bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 200, 500); conn.tableOperations().create(table2); conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - // Write some data to table2 - bw = conn.createBatchWriter(table2, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table2, 200, 500); conn.tableOperations().create(table3); conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - // Write some data to table3 - bw = conn.createBatchWriter(table3, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table3, 200, 500); // Flush everything to try to make the replication records for (String table : Arrays.asList(table1, table2, table3)) { @@ -1252,11 +1114,8 @@ public class ReplicationIT extends ConfigurableMacIT { // Kill the tserver(s) and restart them // to ensure that the WALs we previously observed all move to closed. - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { - cluster.killProcess(ServerType.TABLET_SERVER, proc); - } - - cluster.exec(TabletServer.class); + cluster.getClusterControl().stop(ServerType.TABLET_SERVER); + cluster.getClusterControl().start(ServerType.TABLET_SERVER); // Make sure we can read all the tables (recovery complete) for (String table : Arrays.asList(table1, table2, table3)) { @@ -1359,9 +1218,7 @@ public class ReplicationIT extends ConfigurableMacIT { @Test public void replicatedStatusEntriesAreDeleted() throws Exception { // Just stop it now, we'll restart it after we restart the tserver - for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) { - getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc); - } + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); final Connector conn = getConnector(); log.info("Got connector to MAC"); @@ -1397,17 +1254,7 @@ public class ReplicationIT extends ConfigurableMacIT { Assert.assertNotNull("Could not determine table id for " + table1, tableId); // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 2000; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 50; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); + writeSomeData(conn, table1, 2000, 50); conn.tableOperations().flush(table1, null, null, true); // Make sure the replication table exists at this point @@ -1425,14 +1272,35 @@ public class ReplicationIT extends ConfigurableMacIT { // Grant ourselves the write permission for later conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + log.info("Checking for replication entries in replication"); + // Then we need to get those records over to the replication table + Scanner s; + Set<String> entries = new HashSet<>(); + for (int i = 0; i < 5; i++) { + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + entries.clear(); + for (Entry<Key,Value> entry : s) { + entries.add(entry.getKey().getRow().toString()); + log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + } + if (!entries.isEmpty()) { + log.info("Replication entries {}", entries); + break; + } + Thread.sleep(1000); + } + + Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty()); + // Find the WorkSection record that will be created for that data we ingested boolean notFound = true; - Scanner s; for (int i = 0; i < 10 && notFound; i++) { try { s = ReplicationTable.getScanner(conn); WorkSection.limit(s); Entry<Key,Value> e = Iterables.getOnlyElement(s); + log.info("Found entry: " + e.getKey().toStringNoTruncate()); Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText(); Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); notFound = false; @@ -1483,14 +1351,13 @@ public class ReplicationIT extends ConfigurableMacIT { log.info("Killing tserver"); // Kill the tserver(s) and restart them // to ensure that the WALs we previously observed all move to closed. - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { - cluster.killProcess(ServerType.TABLET_SERVER, proc); - } + cluster.getClusterControl().stop(ServerType.TABLET_SERVER); log.info("Starting tserver"); - cluster.exec(TabletServer.class); + cluster.getClusterControl().start(ServerType.TABLET_SERVER); log.info("Waiting to read tables"); + UtilWaitThread.sleep(2 * 3 * 1000); // Make sure we can read all the tables (recovery complete) for (String table : new String[] {MetadataTable.NAME, table1}) { @@ -1499,55 +1366,48 @@ public class ReplicationIT extends ConfigurableMacIT { Entry<Key,Value> entry : s) {} } - log.info("Checking for replication entries in replication"); - // Then we need to get those records over to the replication table - boolean foundResults = false; - for (int i = 0; i < 5; i++) { - s = ReplicationTable.getScanner(conn); - int count = 0; - for (Entry<Key,Value> entry : s) { - count++; - log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); - } - if (count > 0) { - foundResults = true; - break; - } - Thread.sleep(1000); + log.info("Recovered metadata:"); + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + for (Entry<Key,Value> entry : s) { + log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); } - Assert.assertTrue("Did not find any replication entries in the replication table", foundResults); - - getCluster().exec(SimpleGarbageCollector.class); + cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR); // Wait for a bit since the GC has to run (should be running after a one second delay) waitForGCLock(conn); Thread.sleep(1000); + log.info("After GC"); + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + for (Entry<Key,Value> entry : s) { + log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + } + // We expect no records in the metadata table after compaction. We have to poll // because we have to wait for the StatusMaker's next iteration which will clean // up the dangling *closed* records after we create the record in the replication table. // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record log.info("Checking metadata table for replication entries"); - foundResults = true; + Set<String> remaining = new HashSet<>(); for (int i = 0; i < 10; i++) { s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); s.setRange(ReplicationSection.getRange()); - long size = 0; + remaining.clear(); for (Entry<Key,Value> e : s) { - size++; - log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get()))); + remaining.add(e.getKey().getRow().toString()); } - if (size == 0) { - foundResults = false; + remaining.retainAll(entries); + if (remaining.isEmpty()) { break; } + log.info("remaining {}", remaining); Thread.sleep(2000); log.info(""); } - Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults); + Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty()); /** * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be @@ -1560,10 +1420,10 @@ public class ReplicationIT extends ConfigurableMacIT { recordsFound = 0; for (Entry<Key,Value> entry : s) { recordsFound++; - log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get()))); + log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get()))); } - if (0 == recordsFound) { + if (recordsFound <= 2) { break; } else { Thread.sleep(1000); @@ -1571,6 +1431,6 @@ public class ReplicationIT extends ConfigurableMacIT { } } - Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound); + Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2); } }